You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by co...@apache.org on 2010/06/21 21:02:51 UTC
svn commit: r956666 [4/4] - in /hadoop/mapreduce/trunk: ./ ivy/
src/test/aop/build/ src/test/mapred/org/apache/hadoop/mapred/
src/test/mapred/testjar/ src/test/system/ src/test/system/aop/
src/test/system/aop/org/ src/test/system/aop/org/apache/ src/te...
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskKilling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskKilling.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskKilling.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskKilling.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,640 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapred.JobClient.NetworkedJob;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * A System test for verifying the status after killing the tasks at different
+ * conditions.
+ */
+public class TestTaskKilling {
+ private static final Log LOG = LogFactory.getLog(TestTaskKilling.class);
+ private static MRCluster cluster;
+ private static JobClient jobClient = null;
+ private static JTProtocol remoteJTClient = null;
+
+ public TestTaskKilling() {
+ }
+
+ @BeforeClass
+ public static void before() throws Exception {
+ Configuration conf = new Configuration();
+ cluster = MRCluster.createCluster(conf);
+ cluster.setUp();
+ jobClient = cluster.getJTClient().getClient();
+ remoteJTClient = cluster.getJTClient().getProxy();
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ cluster.tearDown();
+ }
+
+ /**
+ * Verifying the running job status whether it succeeds or not after failing
+ * some of its tasks.
+ *
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void testFailedTaskJobStatus()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration(cluster.getConf());
+ TaskInfo taskInfo = null;
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ Job slpJob = job.createJob(3, 1, 4000, 4000, 100, 100);
+ JobConf jobConf = new JobConf(conf);
+ jobConf.setMaxMapAttempts(20);
+ jobConf.setMaxReduceAttempts(20);
+ slpJob.submit();
+ RunningJob runJob =
+ jobClient.getJob(org.apache.hadoop.mapred.JobID.downgrade(slpJob
+ .getJobID()));
+ JobID id = runJob.getID();
+ JobInfo jInfo = remoteJTClient.getJobInfo(id);
+ int counter = 0;
+ while (counter < 60) {
+ if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+ break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ jInfo = remoteJTClient.getJobInfo(id);
+ }
+ counter++;
+ }
+ Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+
+ TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ taskInfo = taskinfo;
+ }
+ }
+
+ counter = 0;
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ while (counter < 60) {
+ if (taskInfo.getTaskStatus().length > 0) {
+ if (taskInfo.getTaskStatus()[0].getRunState() == TaskStatus.State.RUNNING) {
+ break;
+ }
+ }
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ counter++;
+ }
+ Assert.assertTrue("Task has not been started for 1 min.", counter != 60);
+
+ NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+ TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
+ TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+ networkJob.killTask(taskAttID, false);
+
+ LOG.info("Waiting till the job is completed...");
+ while (!jInfo.getStatus().isJobComplete()) {
+ UtilsForTests.waitFor(100);
+ jInfo = remoteJTClient.getJobInfo(id);
+ }
+
+ Assert.assertEquals(
+ "JobStatus", jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
+ }
+
+ /**
+ * Verifying whether task temporary output directory is cleaned up or not
+ * after killing the task.
+ */
+ @Test
+ public void testDirCleanupAfterTaskKilled()
+ throws IOException, InterruptedException {
+ TaskInfo taskInfo = null;
+ boolean isTempFolderExists = false;
+ String localTaskDir = null;
+ TTClient ttClient = null;
+ TaskID tID = null;
+ FileStatus filesStatus[] = null;
+ Path inputDir = new Path("input");
+ Path outputDir = new Path("output");
+ Configuration conf = new Configuration(cluster.getConf());
+ JobConf jconf = new JobConf(conf);
+ jconf.setJobName("Word Count");
+ jconf.setJarByClass(WordCount.class);
+ jconf.setMapperClass(WordCount.MapClass.class);
+ jconf.setCombinerClass(WordCount.Reduce.class);
+ jconf.setReducerClass(WordCount.Reduce.class);
+ jconf.setNumMapTasks(1);
+ jconf.setNumReduceTasks(1);
+ jconf.setMaxMapAttempts(20);
+ jconf.setMaxReduceAttempts(20);
+ jconf.setOutputKeyClass(Text.class);
+ jconf.setOutputValueClass(IntWritable.class);
+
+ cleanup(inputDir, conf);
+ cleanup(outputDir, conf);
+ createInput(inputDir, conf);
+ FileInputFormat.setInputPaths(jconf, inputDir);
+ FileOutputFormat.setOutputPath(jconf, outputDir);
+ RunningJob runJob = jobClient.submitJob(jconf);
+ JobID id = runJob.getID();
+ JobInfo jInfo = remoteJTClient.getJobInfo(id);
+ int counter = 0;
+ while (counter < 60) {
+ if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+ break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ jInfo = remoteJTClient.getJobInfo(id);
+ }
+ counter++;
+ }
+ Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+
+ JobStatus[] jobStatus = jobClient.getAllJobs();
+ String userName = jobStatus[0].getUsername();
+ TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ taskInfo = taskinfo;
+ break;
+ }
+ }
+
+ counter = 0;
+ while (counter < 30) {
+ if (taskInfo.getTaskStatus().length > 0) {
+ if (taskInfo.getTaskStatus()[0].getRunState() == TaskStatus.State.RUNNING) {
+ break;
+ }
+ }
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ counter++;
+ }
+ Assert.assertTrue("Task has not been started for 30 sec.", counter != 30);
+
+ tID = TaskID.downgrade(taskInfo.getTaskID());
+ FinishTaskControlAction action = new FinishTaskControlAction(tID);
+
+ String[] taskTrackers = taskInfo.getTaskTrackers();
+ counter = 0;
+ while (counter < 30) {
+ if (taskTrackers.length != 0) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ taskTrackers = taskInfo.getTaskTrackers();
+ counter++;
+ }
+
+ String hostName = taskTrackers[0].split("_")[1];
+ hostName = hostName.split(":")[0];
+ ttClient = cluster.getTTClient(hostName);
+ ttClient.getProxy().sendAction(action);
+ String localDirs[] = ttClient.getMapredLocalDirs();
+ TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+ for (String localDir : localDirs) {
+ localTaskDir =
+ localDir
+ + "/"
+ + TaskTracker.getLocalTaskDir(userName, id.toString(), taskAttID
+ .toString());
+ filesStatus = ttClient.listStatus(localTaskDir, true);
+ if (filesStatus.length > 0) {
+ isTempFolderExists = true;
+ NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+ networkJob.killTask(taskAttID, false);
+ break;
+ }
+ }
+
+ Assert.assertTrue(
+ "Task Attempt directory "
+ + taskAttID + " has not been found while task was running.",
+ isTempFolderExists);
+ taskInfo = remoteJTClient.getTaskInfo(tID);
+
+ counter = 0;
+ while (counter < 60) {
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(tID);
+ filesStatus = ttClient.listStatus(localTaskDir, true);
+ if (filesStatus.length == 0) {
+ break;
+ }
+ counter++;
+ }
+
+ Assert.assertTrue(
+ "Task attempt temporary folder has not been cleaned.",
+ isTempFolderExists && filesStatus.length == 0);
+ counter = 0;
+ while (counter < 30) {
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(tID);
+ counter++;
+ }
+ taskInfo = remoteJTClient.getTaskInfo(tID);
+ Assert.assertEquals(
+ "Task status has not been changed to KILLED.", TaskStatus.State.KILLED,
+ taskInfo.getTaskStatus()[0].getRunState());
+ }
+
+ private void cleanup(Path dir, Configuration conf) throws IOException {
+ FileSystem fs = dir.getFileSystem(conf);
+ fs.delete(dir, true);
+ }
+
+ private void createInput(Path inDir, Configuration conf) throws IOException {
+ String input =
+ "Hadoop is framework for data intensive distributed "
+ + "applications.\n"
+ + "Hadoop enables applications to work with thousands of nodes.";
+ FileSystem fs = inDir.getFileSystem(conf);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Failed to create the input directory:"
+ + inDir.toString());
+ }
+ fs.setPermission(inDir, new FsPermission(
+ FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+ int i = 0;
+ while (i < 1000 * 3000) {
+ file.writeBytes(input);
+ i++;
+ }
+ file.close();
+ }
+
+ /**
+ * Verifying whether task temporary output directory is cleaned up or not
+ * after failing the task.
+ */
+ @Test
+ public void testDirCleanupAfterTaskFailed()
+ throws IOException, InterruptedException {
+ TTClient ttClient = null;
+ FileStatus filesStatus[] = null;
+ String localTaskDir = null;
+ TaskInfo taskInfo = null;
+ TaskID tID = null;
+ boolean isTempFolderExists = false;
+ Path inputDir = new Path("input");
+ Path outputDir = new Path("output");
+ Configuration conf = new Configuration(cluster.getConf());
+ JobConf jconf = new JobConf(conf);
+ jconf.setJobName("Task Failed job");
+ jconf.setJarByClass(UtilsForTests.class);
+ jconf.setMapperClass(FailedMapperClass.class);
+ jconf.setNumMapTasks(1);
+ jconf.setNumReduceTasks(0);
+ jconf.setMaxMapAttempts(1);
+ cleanup(inputDir, conf);
+ cleanup(outputDir, conf);
+ createInput(inputDir, conf);
+ FileInputFormat.setInputPaths(jconf, inputDir);
+ FileOutputFormat.setOutputPath(jconf, outputDir);
+ RunningJob runJob = jobClient.submitJob(jconf);
+ JobID id = runJob.getID();
+ JobInfo jInfo = remoteJTClient.getJobInfo(id);
+
+ int counter = 0;
+ while (counter < 60) {
+ if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+ break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ jInfo = remoteJTClient.getJobInfo(id);
+ }
+ counter++;
+ }
+ Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+
+ JobStatus[] jobStatus = jobClient.getAllJobs();
+ String userName = jobStatus[0].getUsername();
+ TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ taskInfo = taskinfo;
+ break;
+ }
+ }
+
+ tID = TaskID.downgrade(taskInfo.getTaskID());
+ FinishTaskControlAction action = new FinishTaskControlAction(tID);
+ String[] taskTrackers = taskInfo.getTaskTrackers();
+ counter = 0;
+ while (counter < 30) {
+ if (taskTrackers.length != 0) {
+ break;
+ }
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ taskTrackers = taskInfo.getTaskTrackers();
+ counter++;
+ }
+ Assert.assertTrue("Task tracker not found.", taskTrackers.length != 0);
+ String hostName = taskTrackers[0].split("_")[1];
+ hostName = hostName.split(":")[0];
+ ttClient = cluster.getTTClient(hostName);
+ ttClient.getProxy().sendAction(action);
+
+ counter = 0;
+ while (counter < 60) {
+ if (taskInfo.getTaskStatus().length > 0) {
+ if (taskInfo.getTaskStatus()[0].getRunState() == TaskStatus.State.RUNNING) {
+ break;
+ }
+ }
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ counter++;
+ }
+ Assert.assertTrue("Task has not been started for 1 min.", counter != 60);
+
+ String localDirs[] = ttClient.getMapredLocalDirs();
+ TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+ for (String localDir : localDirs) {
+ localTaskDir =
+ localDir
+ + "/"
+ + TaskTracker.getLocalTaskDir(userName, id.toString(), taskAttID
+ .toString());
+ filesStatus = ttClient.listStatus(localTaskDir, true);
+ if (filesStatus.length > 0) {
+ isTempFolderExists = true;
+ break;
+ }
+ }
+
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ Assert.assertTrue(
+ "Task Attempt directory "
+ + taskAttID + " has not been found while task was running.",
+ isTempFolderExists);
+ counter = 0;
+ while (counter < 30) {
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(tID);
+ counter++;
+ }
+
+ Assert.assertEquals("Task status has not been changed to FAILED.", taskInfo
+ .getTaskStatus()[0].getRunState(), TaskStatus.State.FAILED);
+
+ filesStatus = ttClient.listStatus(localTaskDir, true);
+ Assert.assertTrue(
+ "Temporary folder has not been cleanup.", filesStatus.length == 0);
+ }
+
+ public static class FailedMapperClass
+ implements Mapper<NullWritable, NullWritable, NullWritable, NullWritable> {
+ public void configure(JobConf job) {
+ }
+
+ public void map(
+ NullWritable key, NullWritable value,
+ OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
+ throws IOException {
+ int counter = 0;
+ while (counter < 240) {
+ UtilsForTests.waitFor(1000);
+ counter++;
+ }
+ if (counter == 240) {
+ throw new IOException();
+ }
+ }
+
+ public void close() {
+ }
+ }
+
+ @Test
+ /**
+ * This tests verification of job killing by killing of all task
+ * attempts of a particular task
+ * @param none
+ * @return void
+ */
+ public void testAllTaskAttemptKill() throws Exception {
+ Configuration conf = new Configuration(cluster.getConf());
+
+ JobStatus[] jobStatus = null;
+
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ Job slpJob = job.createJob(3, 1, 40000, 1000, 100, 100);
+ JobConf jconf = new JobConf(conf);
+
+ // Submitting the job
+ slpJob.submit();
+ RunningJob rJob =
+ cluster.getJTClient().getClient().getJob(
+ org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+
+ int MAX_MAP_TASK_ATTEMPTS =
+ Integer.parseInt(jconf.get(MRJobConfig.MAP_MAX_ATTEMPTS));
+
+ LOG.info("MAX_MAP_TASK_ATTEMPTS is : " + MAX_MAP_TASK_ATTEMPTS);
+
+ Assert.assertTrue(MAX_MAP_TASK_ATTEMPTS > 0);
+
+ TTClient tClient = null;
+ TTClient[] ttClients = null;
+
+ JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID());
+
+ // Assert if jobInfo is null
+ Assert.assertNotNull(jInfo.getStatus().getRunState());
+
+ // Wait for the job to start running.
+ while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ }
+ ;
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ }
+
+ // Temporarily store the jobid to use it later for comparision.
+ JobID jobidStore = rJob.getID();
+ jobidStore = JobID.downgrade(jobidStore);
+ LOG.info("job id is :" + jobidStore.toString());
+
+ TaskInfo[] taskInfos = null;
+
+ // After making sure that the job is running,
+ // the test execution has to make sure that
+ // at least one task has started running before continuing.
+ boolean runningCount = false;
+ int count = 0;
+ do {
+ taskInfos = cluster.getJTClient().getProxy().getTaskInfo(rJob.getID());
+ runningCount = false;
+ for (TaskInfo taskInfo : taskInfos) {
+ TaskStatus[] taskStatuses = taskInfo.getTaskStatus();
+ if (taskStatuses.length > 0) {
+ LOG.info("taskStatuses[0].getRunState() is :"
+ + taskStatuses[0].getRunState());
+ if (taskStatuses[0].getRunState() == TaskStatus.State.RUNNING) {
+ runningCount = true;
+ break;
+ } else {
+ LOG.info("Sleeping 5 seconds");
+ Thread.sleep(5000);
+ }
+ }
+ }
+ count++;
+ // If the count goes beyond a point, then break; This is to avoid
+ // infinite loop under unforeseen circumstances. Testcase will anyway
+ // fail later.
+ if (count > 10) {
+ Assert.fail("Since the sleep count has reached beyond a point"
+ + "failing at this point");
+ }
+ } while (!runningCount);
+
+ // This whole module is about getting the task Attempt id
+ // of one task and killing it MAX_MAP_TASK_ATTEMPTS times,
+ // whenever it re-attempts to run.
+ String taskIdKilled = null;
+ for (int i = 0; i < MAX_MAP_TASK_ATTEMPTS; i++) {
+ taskInfos = cluster.getJTClient().getProxy().getTaskInfo(rJob.getID());
+
+ for (TaskInfo taskInfo : taskInfos) {
+ TaskAttemptID taskAttemptID;
+ if (!taskInfo.isSetupOrCleanup()) {
+ // This is the task which is going to be killed continously in
+ // all its task attempts.The first task is getting picked up.
+ TaskID taskid = TaskID.downgrade(taskInfo.getTaskID());
+ LOG.info("taskid is :" + taskid);
+ if (i == 0) {
+ taskIdKilled = taskid.toString();
+ taskAttemptID = new TaskAttemptID(taskid, i);
+ LOG.info("taskAttemptid going to be killed is : " + taskAttemptID);
+ (jobClient.new NetworkedJob(jInfo.getStatus())).killTask(
+ taskAttemptID, true);
+ checkTaskCompletionEvent(taskAttemptID, jInfo);
+ break;
+ } else {
+ if (taskIdKilled.equals(taskid.toString())) {
+ taskAttemptID = new TaskAttemptID(taskid, i);
+ LOG
+ .info("taskAttemptid going to be killed is : "
+ + taskAttemptID);
+ (jobClient.new NetworkedJob(jInfo.getStatus())).killTask(
+ taskAttemptID, true);
+ checkTaskCompletionEvent(taskAttemptID, jInfo);
+ break;
+ }
+ }
+ }
+ }
+ }
+ // Making sure that the job is complete.
+ while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+ Thread.sleep(10000);
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ }
+
+ // Making sure that the correct jobstatus is got from all the jobs
+ jobStatus = jobClient.getAllJobs();
+ JobStatus jobStatusFound = null;
+ for (JobStatus jobStatusTmp : jobStatus) {
+ if (JobID.downgrade(jobStatusTmp.getJobID()).equals(jobidStore)) {
+ jobStatusFound = jobStatusTmp;
+ LOG.info("jobStatus found is :" + jobStatusFound.getJobId().toString());
+ }
+ }
+
+ // Making sure that the job has FAILED
+ Assert.assertEquals(
+ "The job should have failed at this stage", JobStatus.FAILED,
+ jobStatusFound.getRunState());
+ }
+
+ // This method checks if task Attemptid occurs in the list
+ // of tasks that are completed (killed) for a job.This is
+ // required because after issuing a kill comamnd, the task
+ // has to be killed and appear in the taskCompletion event.
+ // After this a new task attempt will start running in a
+ // matter of few seconds.
+ public void checkTaskCompletionEvent(
+ TaskAttemptID taskAttemptID, JobInfo jInfo) throws Exception {
+ boolean match = false;
+ int count = 0;
+ while (!match) {
+ TaskCompletionEvent[] taskCompletionEvents =
+ jobClient.new NetworkedJob(jInfo.getStatus())
+ .getTaskCompletionEvents(0);
+ for (TaskCompletionEvent taskCompletionEvent : taskCompletionEvents) {
+ if ((taskCompletionEvent.getTaskAttemptId().toString())
+ .equals(taskAttemptID.toString())) {
+ match = true;
+ // Sleeping for 10 seconds giving time for the next task
+ // attempt to run
+ Thread.sleep(10000);
+ break;
+ }
+ }
+ if (!match) {
+ LOG.info("Thread is sleeping for 10 seconds");
+ Thread.sleep(10000);
+ count++;
+ }
+ // If the count goes beyond a point, then break; This is to avoid
+ // infinite loop under unforeseen circumstances.Testcase will anyway
+ // fail later.
+ if (count > 10) {
+ Assert.fail("Since the task attemptid is not appearing in the"
+ + "TaskCompletionEvent, it seems this task attempt was not killed");
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskOwner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskOwner.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskOwner.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskOwner.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import testjar.UserNamePermission;
+
+public class TestTaskOwner {
+ private static final Log LOG = LogFactory.getLog(TestTaskOwner.class);
+ private static Path outDir = new Path("output");
+ private static Path inDir = new Path("input");
+ public static MRCluster cluster;
+
+ // The role of this job is to write the user name to the output file
+ // which will be parsed
+
+ @BeforeClass
+ public static void setUp() throws java.lang.Exception {
+
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setUp();
+ FileSystem fs = inDir.getFileSystem(cluster.getJTClient().getConf());
+ // Make sure that all is clean in case last tearDown wasn't successful
+ fs.delete(outDir, true);
+ fs.delete(inDir, true);
+
+ fs.create(inDir, true);
+ }
+
+ @Test
+ public void testProcessPermission() throws Exception {
+ // The user will submit a job which a plain old map reduce job
+ // this job will output the username of the task that is running
+ // in the cluster and we will authenticate whether matches
+ // with the job that is submitted by the same user.
+
+ Configuration conf = cluster.getJTClient().getConf();
+ Job job = new Job(conf, "user name check");
+
+ job.setJarByClass(UserNamePermission.class);
+ job.setMapperClass(UserNamePermission.UserNameMapper.class);
+ job.setCombinerClass(UserNamePermission.UserNameReducer.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setReducerClass(UserNamePermission.UserNameReducer.class);
+ job.setNumReduceTasks(1);
+
+ FileInputFormat.addInputPath(job, inDir);
+ FileOutputFormat.setOutputPath(job, outDir);
+
+ job.waitForCompletion(true);
+
+ // now verify the user name that is written by the task tracker is same
+ // as the
+ // user name that was used to launch the task in the first place
+ FileSystem fs = outDir.getFileSystem(conf);
+
+ Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+ new Utils.OutputFileUtils.OutputFilesFilter()));
+
+ for (int i = 0; i < fileList.length; ++i) {
+ LOG.info("File list[" + i + "]" + ": " + fileList[i]);
+ BufferedReader file = new BufferedReader(new InputStreamReader(fs
+ .open(fileList[i])));
+ String line = file.readLine();
+ while (line != null) {
+ StringTokenizer token = new StringTokenizer(line);
+ if (token.hasMoreTokens()) {
+ LOG.info("First token " + token.nextToken());
+ String userName = token.nextToken();
+
+ LOG.info("Next token " + userName);
+ Assert
+ .assertEquals(
+ "The user name did not match permission violation ",
+ userName, System.getProperty("user.name")
+ .toString());
+ break;
+ }
+ }
+ file.close();
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws java.lang.Exception {
+ FileSystem fs = outDir.getFileSystem(cluster.getJTClient().getConf());
+ fs.delete(outDir, true);
+ fs.delete(inDir, true);
+ cluster.tearDown();
+ }
+}
+
+