You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:40:22 UTC
svn commit: r1077650 - in
/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred:
TestChildsKillingOfSuspendTask.java TestJobCacheDirectoriesCleanUp.java
TestTaskController.java TestTaskKilling.java
Author: omalley
Date: Fri Mar 4 04:40:22 2011
New Revision: 1077650
URL: http://svn.apache.org/viewvc?rev=1077650&view=rev
Log:
commit 516430ffecf8e7c090920514d8ade7cf6bb79b7e
Author: Vinay Kumar Thota <vi...@yahoo-inc.com>
Date: Sun Aug 8 20:38:56 2010 +0000
3867536 Fix the instability MR system tests from
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java?rev=1077650&r1=1077649&r2=1077650&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java Fri Mar 4 04:40:22 2011
@@ -1,312 +1,319 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.junit.Test;
-import org.junit.Assert;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.Log;
-
-import java.util.Collection;
-import java.util.Hashtable;
-
-import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
-import org.apache.hadoop.mapreduce.test.system.JTProtocol;
-import org.apache.hadoop.mapreduce.test.system.JobInfo;
-import org.apache.hadoop.mapreduce.test.system.MRCluster;
-import org.apache.hadoop.mapreduce.test.system.TTClient;
-import org.apache.hadoop.mapreduce.test.system.JTClient;
-import org.apache.hadoop.mapreduce.test.system.TTProtocol;
-import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
-import org.apache.hadoop.mapreduce.test.system.TaskInfo;
-import testjar.GenerateTaskChildProcess;
-
-public class TestChildsKillingOfSuspendTask {
- private static final Log LOG = LogFactory
- .getLog(TestChildsKillingOfSuspendTask.class);
- private static Configuration conf = new Configuration();
- private static MRCluster cluster;
- private static Path inputDir = new Path("input");
- private static Path outputDir = new Path("output");
- private static String confFile = "mapred-site.xml";
-
- @BeforeClass
- public static void before() throws Exception {
- Hashtable<String,Object> prop = new Hashtable<String,Object>();
- prop.put("mapred.map.max.attempts",1L);
- prop.put("mapred.task.timeout",30000L);
- prop.put("mapreduce.job.complete.cancel.delegation.tokens", false);
- String [] expExcludeList = {"java.net.ConnectException",
- "java.io.IOException"};
- cluster = MRCluster.createCluster(conf);
- cluster.setExcludeExpList(expExcludeList);
- cluster.setUp();
- cluster.restartClusterWithNewConfig(prop, confFile);
- UtilsForTests.waitFor(1000);
- conf = cluster.getJTClient().getProxy().getDaemonConf();
- createInput(inputDir, conf);
- }
- @AfterClass
- public static void after() throws Exception {
- cleanup(inputDir, conf);
- cleanup(outputDir, conf);
- cluster.tearDown();
- cluster.restart();
- }
-
- /**
- * Verify the process tree clean up of a task after
- * task is suspended and wait till the task is
- * terminated based on timeout.
- */
- @Test
- public void testProcessTreeCleanupOfSuspendTask() throws
- IOException {
- TaskInfo taskInfo = null;
- TaskID tID = null;
- TTTaskInfo [] ttTaskinfo = null;
- String pid = null;
- TTProtocol ttIns = null;
- TTClient ttClientIns = null;
- int counter = 0;
-
- JobConf jobConf = new JobConf(conf);
- jobConf.setJobName("Message Display");
- jobConf.setJarByClass(GenerateTaskChildProcess.class);
- jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class);
- jobConf.setNumMapTasks(1);
- jobConf.setNumReduceTasks(0);
- jobConf.setMaxMapAttempts(1);
- cleanup(outputDir, conf);
- FileInputFormat.setInputPaths(jobConf, inputDir);
- FileOutputFormat.setOutputPath(jobConf, outputDir);
-
- JTClient jtClient = cluster.getJTClient();
- JobClient client = jtClient.getClient();
- JTProtocol wovenClient = cluster.getJTClient().getProxy();
- RunningJob runJob = client.submitJob(jobConf);
- JobID id = runJob.getID();
- JobInfo jInfo = wovenClient.getJobInfo(id);
- Assert.assertNotNull("Job information is null",jInfo);
-
- Assert.assertTrue("Job has not been started for 1 min.",
- jtClient.isJobStarted(id));
-
- TaskInfo[] taskInfos = wovenClient.getTaskInfo(id);
- for (TaskInfo taskinfo : taskInfos) {
- if (!taskinfo.isSetupOrCleanup()) {
- taskInfo = taskinfo;
- break;
- }
- }
-
- Assert.assertTrue("Task has not been started for 1 min.",
- jtClient.isTaskStarted(taskInfo));
-
- tID = TaskID.downgrade(taskInfo.getTaskID());
- TaskAttemptID tAttID = new TaskAttemptID(tID,0);
- FinishTaskControlAction action = new FinishTaskControlAction(tID);
-
- Collection<TTClient> ttClients = cluster.getTTClients();
- for (TTClient ttClient : ttClients) {
- TTProtocol tt = ttClient.getProxy();
- tt.sendAction(action);
- ttTaskinfo = tt.getTasks();
- for (TTTaskInfo tttInfo : ttTaskinfo) {
- if (!tttInfo.isTaskCleanupTask()) {
- pid = tttInfo.getPid();
- ttClientIns = ttClient;
- ttIns = tt;
- break;
- }
- }
- if (ttClientIns != null) {
- break;
- }
- }
- Assert.assertTrue("Map process tree is not alive before task suspend.",
- ttIns.isProcessTreeAlive(pid));
- LOG.info("Suspend the task of process id " + pid);
- boolean exitCode = ttIns.suspendProcess(pid);
- Assert.assertTrue("Process(" + pid + ") has not been suspended",
- exitCode);
-
- LOG.info("Waiting till the task is failed...");
- taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
- counter = 0;
- while (counter < 60) {
- if (taskInfo.getTaskStatus().length > 0) {
- if (taskInfo.getTaskStatus()[0].getRunState() ==
- TaskStatus.State.FAILED) {
- break;
- }
- }
- UtilsForTests.waitFor(1000);
- taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
- counter ++;
- }
- Assert.assertTrue("Suspended task is failed "
- + "before the timeout interval.", counter > 30 &&
- taskInfo.getTaskStatus()[0].getRunState() == TaskStatus.State.FAILED);
-
- LOG.info("Waiting till the job is completed...");
- counter = 0;
- while (counter < 60) {
- if (jInfo.getStatus().isJobComplete()) {
- break;
- }
- UtilsForTests.waitFor(1000);
- jInfo = wovenClient.getJobInfo(id);
- counter ++;
- }
- Assert.assertTrue("Job has not been completed for 1 min.",
- counter != 60);
- ttIns = ttClientIns.getProxy();
- UtilsForTests.waitFor(1000);
- Assert.assertTrue("Map process is still alive after task has been failed.",
- !ttIns.isProcessTreeAlive(pid));
- }
-
- /**
- * Verify the process tree cleanup of task after task
- * is suspended and resumed the task before the timeout.
- */
- @Test
- public void testProcessTreeCleanupOfSuspendAndResumeTask() throws
- IOException {
- TaskInfo taskInfo = null;
- TaskID tID = null;
- TTTaskInfo [] ttTaskinfo = null;
- String pid = null;
- TTProtocol ttIns = null;
- TTClient ttClientIns = null;
- int counter = 0;
-
- JobConf jobConf = new JobConf(conf);
- jobConf.setJobName("Message Display");
- jobConf.setJarByClass(GenerateTaskChildProcess.class);
- jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class);
- jobConf.setNumMapTasks(1);
- jobConf.setNumReduceTasks(0);
- jobConf.setMaxMapAttempts(1);
- cleanup(outputDir, conf);
- FileInputFormat.setInputPaths(jobConf, inputDir);
- FileOutputFormat.setOutputPath(jobConf, outputDir);
-
- JTClient jtClient = cluster.getJTClient();
- JobClient client = jtClient.getClient();
- JTProtocol wovenClient = cluster.getJTClient().getProxy();
- RunningJob runJob = client.submitJob(jobConf);
- JobID id = runJob.getID();
- JobInfo jInfo = wovenClient.getJobInfo(id);
- Assert.assertNotNull("Job information is null",jInfo);
-
- Assert.assertTrue("Job has not been started for 1 min.",
- jtClient.isJobStarted(id));
-
- TaskInfo[] taskInfos = wovenClient.getTaskInfo(id);
- for (TaskInfo taskinfo : taskInfos) {
- if (!taskinfo.isSetupOrCleanup()) {
- taskInfo = taskinfo;
- break;
- }
- }
-
- Assert.assertTrue("Task has not been started for 1 min.",
- jtClient.isTaskStarted(taskInfo));
-
- tID = TaskID.downgrade(taskInfo.getTaskID());
- TaskAttemptID tAttID = new TaskAttemptID(tID,0);
- FinishTaskControlAction action = new FinishTaskControlAction(tID);
-
- Collection<TTClient> ttClients = cluster.getTTClients();
- for (TTClient ttClient : ttClients) {
- TTProtocol tt = ttClient.getProxy();
- tt.sendAction(action);
- ttTaskinfo = tt.getTasks();
- for (TTTaskInfo tttInfo : ttTaskinfo) {
- if (!tttInfo.isTaskCleanupTask()) {
- pid = tttInfo.getPid();
- ttClientIns = ttClient;
- ttIns = tt;
- break;
- }
- }
- if (ttClientIns != null) {
- break;
- }
- }
- Assert.assertTrue("Map process tree is not alive before task suspend.",
- ttIns.isProcessTreeAlive(pid));
- LOG.info("Suspend the task of process id " + pid);
- boolean exitCode = ttIns.suspendProcess(pid);
- Assert.assertTrue("Process(" + pid + ") has not been suspended",
- exitCode);
- Assert.assertTrue("Map process is not alive after task "
- + "has been suspended.", ttIns.isProcessTreeAlive(pid));
- UtilsForTests.waitFor(5000);
- exitCode = ttIns.resumeProcess(pid);
- Assert.assertTrue("Suspended process(" + pid + ") has not been resumed",
- exitCode);
- UtilsForTests.waitFor(35000);
- taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
- Assert.assertTrue("Suspended task has not been resumed",
- taskInfo.getTaskStatus()[0].getRunState() ==
- TaskStatus.State.RUNNING);
- UtilsForTests.waitFor(1000);
- Assert.assertTrue("Map process tree is not alive after task is resumed.",
- ttIns.isProcessTreeAlive(pid));
- }
-
- private static void cleanup(Path dir, Configuration conf) throws
- IOException {
- FileSystem fs = dir.getFileSystem(conf);
- fs.delete(dir, true);
- }
-
- private static void createInput(Path inDir, Configuration conf) throws
- IOException {
- String input = "Hadoop is framework for data intensive distributed "
- + "applications.\n Hadoop enables applications "
- + "to work with thousands of nodes.";
- FileSystem fs = inDir.getFileSystem(conf);
- if (!fs.mkdirs(inDir)) {
- throw new IOException("Failed to create the input directory:"
- + inDir.toString());
- }
- fs.setPermission(inDir, new FsPermission(FsAction.ALL,
- FsAction.ALL, FsAction.ALL));
- DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
- int i = 0;
- while(i < 10) {
- file.writeBytes(input);
- i++;
- }
- file.close();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.common.RemoteExecution;
+
+import java.util.Collection;
+import java.util.Hashtable;
+
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.TTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import testjar.GenerateTaskChildProcess;
+
+public class TestChildsKillingOfSuspendTask {
+ private static final Log LOG = LogFactory
+ .getLog(TestChildsKillingOfSuspendTask.class);
+ private static Configuration conf = new Configuration();
+ private static MRCluster cluster;
+ private static Path inputDir = new Path("input");
+ private static Path outputDir = new Path("output");
+ private static String confFile = "mapred-site.xml";
+
+ @BeforeClass
+ public static void before() throws Exception {
+ Hashtable<String,Object> prop = new Hashtable<String,Object>();
+ prop.put("mapred.map.max.attempts",1L);
+ prop.put("mapred.task.timeout",30000L);
+ prop.put("mapreduce.job.complete.cancel.delegation.tokens", false);
+ String [] expExcludeList = {"java.net.ConnectException",
+ "java.io.IOException","org.apache.hadoop.metrics2.MetricsException"};
+ cluster = MRCluster.createCluster(conf);
+ cluster.setExcludeExpList(expExcludeList);
+ cluster.setUp();
+ cluster.restartClusterWithNewConfig(prop, confFile);
+ UtilsForTests.waitFor(1000);
+ conf = cluster.getJTClient().getProxy().getDaemonConf();
+ createInput(inputDir, conf);
+ }
+ @AfterClass
+ public static void after() throws Exception {
+ cleanup(inputDir, conf);
+ cleanup(outputDir, conf);
+ cluster.tearDown();
+ // cluster.restart();
+ }
+
+ /**
+ * Verify the process tree clean up of a task after
+ * task is suspended and wait till the task is
+ * terminated based on timeout.
+ */
+ @Test
+ public void testProcessTreeCleanupOfSuspendTask() throws
+ Exception {
+ TaskInfo taskInfo = null;
+ TaskID tID = null;
+ TTTaskInfo [] ttTaskinfo = null;
+ String pid = null;
+ TTProtocol ttIns = null;
+ TTClient ttClientIns = null;
+ int counter = 0;
+
+ JobConf jobConf = new JobConf(conf);
+ jobConf.setJobName("Message Display");
+ jobConf.setJarByClass(GenerateTaskChildProcess.class);
+ jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class);
+ jobConf.setNumMapTasks(1);
+ jobConf.setNumReduceTasks(0);
+ jobConf.setMaxMapAttempts(1);
+ cleanup(outputDir, conf);
+ FileInputFormat.setInputPaths(jobConf, inputDir);
+ FileOutputFormat.setOutputPath(jobConf, outputDir);
+
+ JTClient jtClient = cluster.getJTClient();
+ JobClient client = jtClient.getClient();
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ RunningJob runJob = client.submitJob(jobConf);
+ JobID id = runJob.getID();
+ JobInfo jInfo = wovenClient.getJobInfo(id);
+ Assert.assertNotNull("Job information is null",jInfo);
+
+ Assert.assertTrue("Job has not been started for 1 min.",
+ jtClient.isJobStarted(id));
+ JobStatus[] jobStatus = client.getAllJobs();
+ String userName = jobStatus[0].getUsername();
+
+ TaskInfo[] taskInfos = wovenClient.getTaskInfo(id);
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ taskInfo = taskinfo;
+ break;
+ }
+ }
+
+ Assert.assertTrue("Task has not been started for 1 min.",
+ jtClient.isTaskStarted(taskInfo));
+
+ tID = TaskID.downgrade(taskInfo.getTaskID());
+ TaskAttemptID tAttID = new TaskAttemptID(tID,0);
+ FinishTaskControlAction action = new FinishTaskControlAction(tID);
+
+ Collection<TTClient> ttClients = cluster.getTTClients();
+ for (TTClient ttClient : ttClients) {
+ TTProtocol tt = ttClient.getProxy();
+ tt.sendAction(action);
+ ttTaskinfo = tt.getTasks();
+ for (TTTaskInfo tttInfo : ttTaskinfo) {
+ if (!tttInfo.isTaskCleanupTask()) {
+ pid = tttInfo.getPid();
+ ttClientIns = ttClient;
+ ttIns = tt;
+ break;
+ }
+ }
+ if (ttClientIns != null) {
+ break;
+ }
+ }
+ Assert.assertTrue("Map process tree is not alive before task suspend.",
+ ttIns.isProcessTreeAlive(pid));
+ LOG.info("Suspend the task of process id " + pid);
+ ExecuteShellCommand execcmd = new ExecuteShellCommand(userName,
+ ttClientIns.getHostName(), "kill -SIGSTOP " + pid);
+ execcmd.start();
+ execcmd.join();
+ UtilsForTests.waitFor(30000);
+ Assert.assertTrue("Process(" + pid + ") has not been suspended",
+ execcmd.getStatus());
+ ttIns = ttClientIns.getProxy();
+ UtilsForTests.waitFor(1000);
+ Assert.assertTrue("Map process is still alive after task has been failed.",
+ !ttIns.isProcessTreeAlive(pid));
+ }
+
+ /**
+ * Verify the process tree cleanup of task after task
+ * is suspended and resumed the task before the timeout.
+ */
+ @Test
+ public void testProcessTreeCleanupOfSuspendAndResumeTask() throws
+ Exception {
+ TaskInfo taskInfo = null;
+ TaskID tID = null;
+ TTTaskInfo [] ttTaskinfo = null;
+ String pid = null;
+ TTProtocol ttIns = null;
+ TTClient ttClientIns = null;
+ int counter = 0;
+
+ JobConf jobConf = new JobConf(conf);
+ jobConf.setJobName("Message Display");
+ jobConf.setJarByClass(GenerateTaskChildProcess.class);
+ jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class);
+ jobConf.setNumMapTasks(1);
+ jobConf.setNumReduceTasks(0);
+ jobConf.setMaxMapAttempts(1);
+ cleanup(outputDir, conf);
+ FileInputFormat.setInputPaths(jobConf, inputDir);
+ FileOutputFormat.setOutputPath(jobConf, outputDir);
+
+ JTClient jtClient = cluster.getJTClient();
+ JobClient client = jtClient.getClient();
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ RunningJob runJob = client.submitJob(jobConf);
+ JobID id = runJob.getID();
+ JobInfo jInfo = wovenClient.getJobInfo(id);
+ Assert.assertNotNull("Job information is null",jInfo);
+
+ Assert.assertTrue("Job has not been started for 1 min.",
+ jtClient.isJobStarted(id));
+
+ JobStatus[] jobStatus = client.getAllJobs();
+ String userName = jobStatus[0].getUsername();
+
+ TaskInfo[] taskInfos = wovenClient.getTaskInfo(id);
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ taskInfo = taskinfo;
+ break;
+ }
+ }
+
+ Assert.assertTrue("Task has not been started for 1 min.",
+ jtClient.isTaskStarted(taskInfo));
+
+ tID = TaskID.downgrade(taskInfo.getTaskID());
+ TaskAttemptID tAttID = new TaskAttemptID(tID,0);
+ FinishTaskControlAction action = new FinishTaskControlAction(tID);
+
+ Collection<TTClient> ttClients = cluster.getTTClients();
+ for (TTClient ttClient : ttClients) {
+ TTProtocol tt = ttClient.getProxy();
+ tt.sendAction(action);
+ ttTaskinfo = tt.getTasks();
+ for (TTTaskInfo tttInfo : ttTaskinfo) {
+ if (!tttInfo.isTaskCleanupTask()) {
+ pid = tttInfo.getPid();
+ ttClientIns = ttClient;
+ ttIns = tt;
+ break;
+ }
+ }
+ if (ttClientIns != null) {
+ break;
+ }
+ }
+ Assert.assertTrue("Map process tree is not alive before task suspend.",
+ ttIns.isProcessTreeAlive(pid));
+ LOG.info("Suspend the task of process id " + pid);
+ ExecuteShellCommand execcmd = new ExecuteShellCommand(userName,
+ ttClientIns.getHostName(), "kill -SIGSTOP " + pid);
+ execcmd.start();
+ execcmd.join();
+
+ Assert.assertTrue("Process(" + pid + ") has not been suspended",
+ execcmd.getStatus());
+ Assert.assertTrue("Map process is not alive after task "
+ + "has been suspended.", ttIns.isProcessTreeAlive(pid));
+ UtilsForTests.waitFor(5000);
+ ExecuteShellCommand execcmd1 = new ExecuteShellCommand(userName,
+ ttClientIns.getHostName(), "kill -SIGCONT " + pid);
+ execcmd1.start();
+ execcmd1.join();
+ Assert.assertTrue("Suspended process(" + pid + ") has not been resumed",
+ execcmd1.getStatus());
+ UtilsForTests.waitFor(5000);
+ Assert.assertTrue("Map process tree is not alive after task is resumed.",
+ ttIns.isProcessTreeAlive(pid));
+ }
+
+ private static void cleanup(Path dir, Configuration conf) throws
+ IOException {
+ FileSystem fs = dir.getFileSystem(conf);
+ fs.delete(dir, true);
+ }
+
+ private static void createInput(Path inDir, Configuration conf) throws
+ IOException {
+ String input = "Hadoop is framework for data intensive distributed "
+ + "applications.\n Hadoop enables applications "
+ + "to work with thousands of nodes.";
+ FileSystem fs = inDir.getFileSystem(conf);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Failed to create the input directory:"
+ + inDir.toString());
+ }
+ fs.setPermission(inDir, new FsPermission(FsAction.ALL,
+ FsAction.ALL, FsAction.ALL));
+ DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+ int i = 0;
+ while(i < 10) {
+ file.writeBytes(input);
+ i++;
+ }
+ file.close();
+ }
+
+ class ExecuteShellCommand extends Thread {
+ String userName;
+ String cmd;
+ String hostName;
+ boolean exitStatus;
+ public ExecuteShellCommand(String userName, String hostName, String cmd) {
+ this.userName = userName;
+ this.hostName = hostName;
+ this.cmd = cmd;
+ }
+ public void run() {
+ try {
+ RemoteExecution.executeCommand(hostName, userName, cmd);
+ exitStatus = true;
+ } catch(InterruptedException iexp) {
+ LOG.warn("Thread is interrupted:" + iexp.getMessage());
+ exitStatus = false;
+ } catch(Exception exp) {
+ LOG.warn("Exception:" + exp.getMessage());
+ exitStatus = false;
+ }
+ }
+ public boolean getStatus(){
+ return exitStatus;
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java?rev=1077650&r1=1077649&r2=1077650&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java Fri Mar 4 04:40:22 2011
@@ -186,7 +186,9 @@ public class TestJobCacheDirectoriesClea
TaskID taskId = TaskID.downgrade(taskinfo.getTaskID());
TaskAttemptID taskAttID = new TaskAttemptID(taskId,
taskinfo.numFailedAttempts());
- while(taskinfo.numFailedAttempts() < 4) {
+ int MAX_MAP_TASK_ATTEMPTS = Integer.
+ parseInt(jobConf.get("mapred.map.max.attempts"));
+ while(taskinfo.numFailedAttempts() < MAX_MAP_TASK_ATTEMPTS) {
NetworkedJob networkJob = jtClient.getClient().
new NetworkedJob(jobInfo.getStatus());
networkJob.killTask(taskAttID, true);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java?rev=1077650&r1=1077649&r2=1077650&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java Fri Mar 4 04:40:22 2011
@@ -5,23 +5,19 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.test.system.MRCluster;
-import org.apache.hadoop.mapreduce.test.system.TTClient;
import org.apache.hadoop.mapreduce.test.system.JTClient;
import org.apache.hadoop.mapreduce.test.system.JTProtocol;
import org.apache.hadoop.mapreduce.test.system.JobInfo;
-import org.apache.hadoop.mapreduce.test.system.TaskInfo;
-import org.apache.hadoop.mapred.ClusterWithLinuxTaskController;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.examples.SleepJob;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import org.junit.Assert;
-import java.io.IOException;
-import java.util.Hashtable;
/**
- * Set the invalid configuration to task controller and verify whether the
- * task status of a job.
+ * Set the invalid configuration to task controller and verify the
+ * job status.
*/
public class TestTaskController {
private static final Log LOG = LogFactory.getLog(TestTaskController.class);
@@ -29,19 +25,14 @@ public class TestTaskController {
private static MRCluster cluster;
private static JTProtocol remoteJTClient;
private static JTClient jtClient;
- private static String confFile = "mapred-site.xml";
+
@Before
public void before() throws Exception {
- Hashtable<String,Object> prop = new Hashtable<String,Object>();
- prop.put("mapred.local.dir","/mapred/local");
- prop.put("mapred.map.max.attempts", 1L);
- prop.put("mapreduce.job.complete.cancel.delegation.tokens", false);
String [] expExcludeList = {"java.net.ConnectException",
- "java.io.IOException"};
+ "java.io.IOException"};
cluster = MRCluster.createCluster(conf);
cluster.setExcludeExpList(expExcludeList);
cluster.setUp();
- cluster.restartClusterWithNewConfig(prop, confFile);
jtClient = cluster.getJTClient();
remoteJTClient = jtClient.getProxy();
}
@@ -49,12 +40,11 @@ public class TestTaskController {
@After
public void after() throws Exception {
cluster.tearDown();
- cluster.restart();
}
/**
* Set the invalid mapred local directory location and run the job.
- * Verify whether job has failed or not.
+ * Verify the job status.
* @throws Exception - if an error occurs.
*/
@Test
@@ -63,50 +53,32 @@ public class TestTaskController {
conf = remoteJTClient.getDaemonConf();
if (conf.get("mapred.task.tracker.task-controller").
equals("org.apache.hadoop.mapred.LinuxTaskController")) {
- TaskController linuxTC = new LinuxTaskController();
- linuxTC.setConf(conf);
+ StringBuffer mapredLocalDir = new StringBuffer();
+ LOG.info("JobConf.MAPRED_LOCAL_DIR_PROPERTY:" + conf.get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ mapredLocalDir.append(conf.get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ mapredLocalDir.append(",");
+ mapredLocalDir.append("/mapred/local");
+ String jobArgs []= {"-D","mapred.local.dir=" + mapredLocalDir.toString(),
+ "-m", "1",
+ "-r", "1",
+ "-mt", "1000",
+ "-rt", "1000",
+ "-recordt","100"};
SleepJob job = new SleepJob();
- job.setConf(conf);
- final JobConf jobConf = job.setupJobConf(2, 1, 4000, 4000, 100, 100);
+ JobConf jobConf = new JobConf(conf);
+ int exitStatus = ToolRunner.run(jobConf, job, jobArgs);
+ Assert.assertEquals("Exit Code:", 0, exitStatus);
+ UtilsForTests.waitFor(100);
JobClient jobClient = jtClient.getClient();
- RunningJob runJob = jobClient.submitJob(jobConf);
- JobID jobId = runJob.getID();
- Assert.assertTrue("Job has not been started for 1 min.",
- jtClient.isJobStarted(jobId));
- TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(jobId);
- TaskInfo taskInfo = null;
- for (TaskInfo taskinfo : taskInfos) {
- if (!taskinfo.isSetupOrCleanup()) {
- taskInfo = taskinfo;
- break;
- }
- }
- Assert.assertTrue("Task has not been started for 1 min.",
- jtClient.isTaskStarted(taskInfo));
- TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
- TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
- TaskStatus taskStatus = null;
- int counter = 0;
- while(counter++ < 60) {
- if (taskInfo.getTaskStatus().length > 0) {
- taskStatus = taskInfo.getTaskStatus()[0];
- break;
- }
- taskInfo = remoteJTClient.getTaskInfo(tID);
- UtilsForTests.waitFor(1000);
- }
- while (taskInfo.getTaskStatus()[0].getRunState() ==
- TaskStatus.State.RUNNING) {
- UtilsForTests.waitFor(1000);
- taskInfo = remoteJTClient.getTaskInfo(tID);
- }
- Assert.assertTrue("Job has not been stopped for 1 min.",
- jtClient.isJobStopped(jobId));
- JobInfo jobInfo = remoteJTClient.getJobInfo(jobId);
- Assert.assertEquals("Job has not been failed",
- jobInfo.getStatus().getRunState(), JobStatus.FAILED);
- } else {
- Assert.assertTrue("Linux Task controller not found.", false);
- }
+ JobID jobId =jobClient.getAllJobs()[0].getJobID();
+ LOG.info("JobId:" + jobId);
+ if (jobId != null) {
+ JobInfo jInfo = remoteJTClient.getJobInfo(jobId);
+ Assert.assertEquals("Job has not been succeeded",
+ jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
+ }
+ } else {
+ Assert.assertTrue("Linux Task controller not found.", false);
+ }
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java?rev=1077650&r1=1077649&r2=1077650&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java Fri Mar 4 04:40:22 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.test.
import org.apache.hadoop.mapreduce.test.system.JobInfo;
import org.apache.hadoop.mapreduce.test.system.TaskInfo;
import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
import org.apache.hadoop.mapred.JobClient.NetworkedJob;
import org.apache.hadoop.io.NullWritable;
@@ -53,18 +54,17 @@ public class TestTaskKilling {
private static final Log LOG = LogFactory.getLog(TestTaskKilling.class);
private static MRCluster cluster;
private static JobClient jobClient = null;
+ private static JTClient jtClient = null;
private static JTProtocol remoteJTClient = null;
-
- public TestTaskKilling() {
- }
+ private static Configuration conf = new Configuration();
@BeforeClass
- public static void before() throws Exception {
- Configuration conf = new Configuration();
+ public static void before() throws Exception {
cluster = MRCluster.createCluster(conf);
cluster.setUp();
- jobClient = cluster.getJTClient().getClient();
- remoteJTClient = cluster.getJTClient().getProxy();
+ jtClient = cluster.getJTClient();
+ jobClient = jtClient.getClient();
+ remoteJTClient = jtClient.getProxy();
}
@AfterClass
@@ -79,64 +79,39 @@ public class TestTaskKilling {
@Test
public void testFailedTaskJobStatus() throws IOException,
InterruptedException {
- Configuration conf = new Configuration(cluster.getConf());
+ conf = remoteJTClient.getDaemonConf();
TaskInfo taskInfo = null;
SleepJob job = new SleepJob();
job.setConf(conf);
- conf = job.setupJobConf(3, 1, 4000, 4000, 100, 100);
- JobConf jobConf = new JobConf(conf);
- jobConf.setMaxMapAttempts(20);
- jobConf.setMaxReduceAttempts(20);
+ JobConf jobConf = job.setupJobConf(1, 1, 10000, 4000, 100, 100);
RunningJob runJob = jobClient.submitJob(jobConf);
- JobID id = runJob.getID();
- JobInfo jInfo = remoteJTClient.getJobInfo(id);
- int counter = 0;
- while (counter < 60) {
- if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
- break;
- } else {
- UtilsForTests.waitFor(1000);
- jInfo = remoteJTClient.getJobInfo(id);
- }
- counter ++;
- }
- Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
-
- TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+ JobID jobId = runJob.getID();
+ JobInfo jInfo = remoteJTClient.getJobInfo(jobId);
+ Assert.assertTrue("Job has not been started for 1 min.",
+ jtClient.isJobStarted(jobId));
+ TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(jobId);
for (TaskInfo taskinfo : taskInfos) {
- if (!taskinfo.isSetupOrCleanup()) {
+ if (!taskinfo.isSetupOrCleanup() && taskinfo.getTaskID().isMap()) {
taskInfo = taskinfo;
+ break;
}
}
+ Assert.assertTrue("Task has not been started for 1 min.",
+ jtClient.isTaskStarted(taskInfo));
- counter = 0;
- taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
- while (counter < 60) {
- if (taskInfo.getTaskStatus().length > 0) {
- if (taskInfo.getTaskStatus()[0].getRunState()
- == TaskStatus.State.RUNNING) {
- break;
- }
- }
- UtilsForTests.waitFor(1000);
- taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
- counter++;
- }
- Assert.assertTrue("Task has not been started for 1 min.", counter != 60);
-
+ // Fail the running task.
NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
TaskAttemptID taskAttID = new TaskAttemptID(tID , 0);
- networkJob.killTask(taskAttID, false);
+ networkJob.killTask(taskAttID, true);
LOG.info("Waiting till the job is completed...");
while (!jInfo.getStatus().isJobComplete()) {
UtilsForTests.waitFor(100);
- jInfo = remoteJTClient.getJobInfo(id);
+ jInfo = remoteJTClient.getJobInfo(jobId);
}
-
- Assert.assertEquals("JobStatus", jInfo.getStatus().getRunState(),
- JobStatus.SUCCEEDED);
+ Assert.assertEquals("JobStatus", JobStatus.SUCCEEDED,
+ jInfo.getStatus().getRunState());
}
@@ -151,7 +126,6 @@ public class TestTaskKilling {
boolean isTempFolderExists = false;
String localTaskDir = null;
TTClient ttClient = null;
- TaskID tID = null;
FileStatus filesStatus [] = null;
Path inputDir = new Path("input");
Path outputDir = new Path("output");
@@ -164,8 +138,6 @@ public class TestTaskKilling {
jconf.setReducerClass(WordCount.Reduce.class);
jconf.setNumMapTasks(1);
jconf.setNumReduceTasks(1);
- jconf.setMaxMapAttempts(20);
- jconf.setMaxReduceAttempts(20);
jconf.setOutputKeyClass(Text.class);
jconf.setOutputValueClass(IntWritable.class);
@@ -177,61 +149,46 @@ public class TestTaskKilling {
RunningJob runJob = jobClient.submitJob(jconf);
JobID id = runJob.getID();
JobInfo jInfo = remoteJTClient.getJobInfo(id);
- int counter = 0;
- while (counter < 60) {
- if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
- break;
- } else {
- UtilsForTests.waitFor(1000);
- jInfo = remoteJTClient.getJobInfo(id);
- }
- counter ++;
- }
- Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+ Assert.assertTrue("Job has not been started for 1 min.",
+ jtClient.isJobStarted(id));
JobStatus[] jobStatus = jobClient.getAllJobs();
String userName = jobStatus[0].getUsername();
TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
for (TaskInfo taskinfo : taskInfos) {
- if (!taskinfo.isSetupOrCleanup()) {
+ if (!taskinfo.isSetupOrCleanup() && taskinfo.getTaskID().isMap()) {
taskInfo = taskinfo;
break;
}
}
- counter = 0;
- while (counter < 30) {
- if (taskInfo.getTaskStatus().length > 0) {
- if (taskInfo.getTaskStatus()[0].getRunState()
- == TaskStatus.State.RUNNING) {
- break;
- }
- }
- UtilsForTests.waitFor(1000);
- taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
- counter ++;
- }
- Assert.assertTrue("Task has not been started for 30 sec.",
- counter != 30);
+ Assert.assertTrue("Task has not been started for 1 min.",
+ jtClient.isTaskStarted(taskInfo));
- tID = TaskID.downgrade(taskInfo.getTaskID());
+ TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
FinishTaskControlAction action = new FinishTaskControlAction(tID);
String[] taskTrackers = taskInfo.getTaskTrackers();
- counter = 0;
- while (counter < 30) {
- if (taskTrackers.length != 0) {
+ int counter = 0;
+ TaskInfo prvTaskInfo = taskInfo;
+ while (counter++ < 30) {
+ if (taskTrackers.length > 0) {
break;
+ } else {
+ UtilsForTests.waitFor(100);
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ if (taskInfo == null) {
+ taskInfo = prvTaskInfo;
+ } else {
+ prvTaskInfo = taskInfo;
+ }
+ taskTrackers = taskInfo.getTaskTrackers();
}
- UtilsForTests.waitFor(100);
- taskTrackers = taskInfo.getTaskTrackers();
- counter ++;
}
-
+ Assert.assertTrue("TaskTracker is not found.", taskTrackers.length > 0);
String hostName = taskTrackers[0].split("_")[1];
hostName = hostName.split(":")[0];
- ttClient = cluster.getTTClient(hostName);
- ttClient.getProxy().sendAction(action);
+ ttClient = cluster.getTTClient(hostName);
String localDirs[] = ttClient.getMapredLocalDirs();
TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
for (String localDir : localDirs) {
@@ -241,46 +198,49 @@ public class TestTaskKilling {
filesStatus = ttClient.listStatus(localTaskDir, true);
if (filesStatus.length > 0) {
isTempFolderExists = true;
- NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
- networkJob.killTask(taskAttID, false);
break;
}
}
-
+
Assert.assertTrue("Task Attempt directory " +
taskAttID + " has not been found while task was running.",
isTempFolderExists);
+
+ NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+ networkJob.killTask(taskAttID, false);
+ ttClient.getProxy().sendAction(action);
+ taskInfo = remoteJTClient.getTaskInfo(tID);
+ while(taskInfo.getTaskStatus()[0].getRunState() ==
+ TaskStatus.State.RUNNING) {
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(tID);
+ }
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(tID);
+ Assert.assertTrue("Task status has not been changed to KILLED.",
+ (TaskStatus.State.KILLED ==
+ taskInfo.getTaskStatus()[0].getRunState()
+ || TaskStatus.State.KILLED_UNCLEAN ==
+ taskInfo.getTaskStatus()[0].getRunState()));
taskInfo = remoteJTClient.getTaskInfo(tID);
-
counter = 0;
- while (counter < 60) {
- UtilsForTests.waitFor(1000);
- taskInfo = remoteJTClient.getTaskInfo(tID);
+ while (counter++ < 60) {
filesStatus = ttClient.listStatus(localTaskDir, true);
if (filesStatus.length == 0) {
break;
+ } else {
+ UtilsForTests.waitFor(100);
}
- counter ++;
}
-
Assert.assertTrue("Task attempt temporary folder has not been cleaned.",
isTempFolderExists && filesStatus.length == 0);
- counter = 0;
- while (counter < 30) {
- UtilsForTests.waitFor(1000);
- taskInfo = remoteJTClient.getTaskInfo(tID);
- counter ++;
+ UtilsForTests.waitFor(1000);
+ jInfo = remoteJTClient.getJobInfo(id);
+ LOG.info("Waiting till the job is completed...");
+ while (!jInfo.getStatus().isJobComplete()) {
+ UtilsForTests.waitFor(100);
+ jInfo = remoteJTClient.getJobInfo(id);
}
- taskInfo = remoteJTClient.getTaskInfo(tID);
- Assert.assertEquals("Task status has not been changed to KILLED.",
- TaskStatus.State.KILLED,
- taskInfo.getTaskStatus()[0].getRunState());
- //Kill the job before testcase finishes.
- runJob.killJob();
-
- Assert.assertTrue("Job has not been stopped for 1 min.",
- ((cluster.getJTClient()).isJobStopped(id)));
-
}
private void cleanup(Path dir, Configuration conf) throws
@@ -323,81 +283,51 @@ public class TestTaskKilling {
TaskInfo taskInfo = null;
TaskID tID = null;
boolean isTempFolderExists = false;
- Path inputDir = new Path("input");
- Path outputDir = new Path("output");
- Configuration conf = new Configuration(cluster.getConf());
- JobConf jconf = new JobConf(conf);
- jconf.setJobName("Task Failed job");
- jconf.setJarByClass(UtilsForTests.class);
- jconf.setMapperClass(FailedMapperClass.class);
- jconf.setNumMapTasks(1);
- jconf.setNumReduceTasks(0);
- jconf.setMaxMapAttempts(1);
- cleanup(inputDir, conf);
- cleanup(outputDir, conf);
- createInput(inputDir, conf);
- FileInputFormat.setInputPaths(jconf, inputDir);
- FileOutputFormat.setOutputPath(jconf, outputDir);
- RunningJob runJob = jobClient.submitJob(jconf);
+ conf = remoteJTClient.getDaemonConf();
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ JobConf jobConf = job.setupJobConf(1, 0, 10000,100, 10, 10);
+ RunningJob runJob = jobClient.submitJob(jobConf);
JobID id = runJob.getID();
JobInfo jInfo = remoteJTClient.getJobInfo(id);
-
- int counter = 0;
- while (counter < 60) {
- if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
- break;
- } else {
- UtilsForTests.waitFor(1000);
- jInfo = remoteJTClient.getJobInfo(id);
- }
- counter ++;
- }
- Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+ Assert.assertTrue("Job has not been started for 1 min.",
+ jtClient.isJobStarted(id));
JobStatus[] jobStatus = jobClient.getAllJobs();
String userName = jobStatus[0].getUsername();
TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
for (TaskInfo taskinfo : taskInfos) {
- if (!taskinfo.isSetupOrCleanup()) {
+ if (!taskinfo.isSetupOrCleanup() && taskinfo.getTaskID().isMap()) {
taskInfo = taskinfo;
break;
}
}
-
+ Assert.assertTrue("Task has not been started for 1 min.",
+ jtClient.isTaskStarted(taskInfo));
+
tID = TaskID.downgrade(taskInfo.getTaskID());
FinishTaskControlAction action = new FinishTaskControlAction(tID);
String[] taskTrackers = taskInfo.getTaskTrackers();
- counter = 0;
- while (counter < 30) {
- if (taskTrackers.length != 0) {
+ int counter = 0;
+ TaskInfo prvTaskInfo = taskInfo;
+ while (counter++ < 30) {
+ if (taskTrackers.length > 0) {
break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ if (taskInfo == null) {
+ taskInfo = prvTaskInfo;
+ } else {
+ prvTaskInfo = taskInfo;
+ }
+ taskTrackers = taskInfo.getTaskTrackers();
}
- UtilsForTests.waitFor(1000);
- taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
- taskTrackers = taskInfo.getTaskTrackers();
- counter ++;
}
- Assert.assertTrue("Task tracker not found.", taskTrackers.length != 0);
+ Assert.assertTrue("Task tracker not found.", taskTrackers.length > 0);
String hostName = taskTrackers[0].split("_")[1];
hostName = hostName.split(":")[0];
ttClient = cluster.getTTClient(hostName);
- ttClient.getProxy().sendAction(action);
-
- counter = 0;
- while(counter < 60) {
- if (taskInfo.getTaskStatus().length > 0) {
- if (taskInfo.getTaskStatus()[0].getRunState()
- == TaskStatus.State.RUNNING) {
- break;
- }
- }
- UtilsForTests.waitFor(1000);
- taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
- counter ++;
- }
- Assert.assertTrue("Task has not been started for 1 min.",
- counter != 60);
-
String localDirs[] = ttClient.getMapredLocalDirs();
TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
for (String localDir : localDirs) {
@@ -409,57 +339,49 @@ public class TestTaskKilling {
isTempFolderExists = true;
break;
}
- }
-
- taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ }
+
Assert.assertTrue("Task Attempt directory " +
taskAttID + " has not been found while task was running.",
isTempFolderExists);
- counter = 0;
- while (counter < 30) {
- if (taskInfo.getTaskStatus().length > 0) {
- break;
+ boolean isFailTask = false;
+ JobInfo jobInfo = remoteJTClient.getJobInfo(id);
+ int MAX_MAP_TASK_ATTEMPTS = Integer.parseInt(
+ jobConf.get("mapred.map.max.attempts"));
+ if (!isFailTask) {
+ TaskID taskId = TaskID.downgrade(taskInfo.getTaskID());
+ TaskAttemptID tAttID = new TaskAttemptID(taskId,
+ taskInfo.numFailedAttempts());
+ while(taskInfo.numFailedAttempts() < MAX_MAP_TASK_ATTEMPTS) {
+ NetworkedJob networkJob = jtClient.getClient().
+ new NetworkedJob(jobInfo.getStatus());
+ networkJob.killTask(taskAttID, true);
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ taskAttID = new TaskAttemptID(taskId, taskInfo.numFailedAttempts());
+ }
+ isFailTask=true;
}
- UtilsForTests.waitFor(1000);
- taskInfo = remoteJTClient.getTaskInfo(tID);
- counter ++;
- }
-
- while (taskInfo.getTaskStatus()[0].getRunState() ==
- TaskStatus.State.RUNNING) {
- UtilsForTests.waitFor(1000);
- taskInfo = remoteJTClient.getTaskInfo(tID);
- }
- Assert.assertEquals("Task status has not been changed to FAILED.",
- taskInfo.getTaskStatus()[0].getRunState(),
- TaskStatus.State.FAILED);
-
+
+ ttClient.getProxy().sendAction(action);
+ taskInfo = remoteJTClient.getTaskInfo(tID);
+ Assert.assertTrue("Task status has not been changed to FAILED.",
+ TaskStatus.State.FAILED ==
+ taskInfo.getTaskStatus()[0].getRunState()
+ || TaskStatus.State.FAILED_UNCLEAN ==
+ taskInfo.getTaskStatus()[0].getRunState());
+ UtilsForTests.waitFor(1000);
filesStatus = ttClient.listStatus(localTaskDir, true);
Assert.assertTrue("Temporary folder has not been cleanup.",
filesStatus.length == 0);
-
- }
-
- public static class FailedMapperClass implements
- Mapper<NullWritable, NullWritable, NullWritable, NullWritable> {
- public void configure(JobConf job) {
- }
- public void map(NullWritable key, NullWritable value,
- OutputCollector<NullWritable, NullWritable> output,
- Reporter reporter) throws IOException {
- int counter = 0;
- while (counter < 240) {
- UtilsForTests.waitFor(1000);
- counter ++;
- }
- if (counter == 240) {
- throw new IOException();
- }
- }
- public void close() {
+ UtilsForTests.waitFor(1000);
+ jInfo = remoteJTClient.getJobInfo(id);
+ LOG.info("Waiting till the job is completed...");
+ while (!jInfo.getStatus().isJobComplete()) {
+ UtilsForTests.waitFor(100);
+ jInfo = remoteJTClient.getJobInfo(id);
}
}
-
+
@Test
/**
* This tests verification of job killing by killing of all task