You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:39:22 UTC
svn commit: r1077640 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/streaming/src/test/system/org/apache/hadoop/mapred/
test/system/java/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 04:39:22 2011
New Revision: 1077640
URL: http://svn.apache.org/viewvc?rev=1077640&view=rev
Log:
commit afba6e20c76fa75bb1f97e5fb4d26fc8de00364f
Author: Vinay Kumar Thota <vi...@yahoo-inc.com>
Date: Thu Aug 5 04:58:36 2010 +0000
3867536 Fix the instability MR system tests from
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java Fri Mar 4 04:39:22 2011
@@ -1,297 +1,351 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.test.system.JTClient;
-import org.apache.hadoop.mapreduce.test.system.JobInfo;
-import org.apache.hadoop.mapreduce.test.system.MRCluster;
-import org.apache.hadoop.mapreduce.test.system.TaskInfo;
-import org.apache.hadoop.mapreduce.test.system.JTProtocol;
-import org.apache.hadoop.streaming.StreamJob;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
-
-/**
- * Increase memory usage beyond the memory limits of streaming job and
- * verify whether task manager logs the process tree status
- * before killing or not.
- */
-public class TestStreamingJobProcessTree {
- private static final Log LOG = LogFactory
- .getLog(TestStreamingJobProcessTree.class);
- private static MRCluster cluster;
- private static Configuration conf = new Configuration();
- private static Path inputDir = new Path("input");
- private static Path outputDir = new Path("output");
-
- @BeforeClass
- public static void before() throws Exception {
- String [] excludeExpList = {"java.net.ConnectException",
- "java.io.IOException"};
- cluster = MRCluster.createCluster(conf);
- cluster.setExcludeExpList(excludeExpList);
- cluster.setUp();
- conf = cluster.getJTClient().getProxy().getDaemonConf();
- createInput(inputDir, conf);
- }
- @AfterClass
- public static void after() throws Exception {
- cleanup(inputDir, conf);
- cleanup(outputDir, conf);
- cluster.tearDown();
- }
-
- /**
- * Increase the memory limit for map task and verify whether the
- * task manager logs the process tree status before killing or not.
- * @throws IOException - If an I/O error occurs.
- */
- @Test
- public void testStreamingJobProcTreeCleanOfMapTask() throws
- IOException {
- String runtimeArgs [] = {
- "-D", "mapred.job.name=ProcTreeStreamJob",
- "-D", "mapred.map.tasks=1",
- "-D", "mapred.reduce.tasks=0",
- "-D", "mapred.map.max.attempts=1",
- "-D", "mapred.cluster.max.map.memory.mb=2",
- "-D", "mapred.cluster.map.memory.mb=1",
- "-D", "mapred.job.map.memory.mb=0.5"
- };
-
- String [] otherArgs = new String[] {
- "-input", inputDir.toString(),
- "-output", outputDir.toString(),
- "-mapper", "ProcessTree.sh",
- };
- JobID jobId = getJobId(runtimeArgs, otherArgs);
- LOG.info("Job ID:" + jobId);
- Assert.assertNotNull("Job ID not found for 1 min", jobId);
- Assert.assertTrue("Job has not been started for 1 min.",
- cluster.getJTClient().isJobStarted(jobId));
- TaskInfo taskInfo = getTaskInfo(jobId);
- Assert.assertNotNull("TaskInfo is null",taskInfo);
- Assert.assertTrue("Task has not been started for 1 min.",
- cluster.getJTClient().isTaskStarted(taskInfo));
- JTProtocol wovenClient = cluster.getJTClient().getProxy();
- int counter = 0;
- while (counter++ < 60) {
- if (taskInfo.getTaskStatus().length == 0) {
- UtilsForTests.waitFor(1000);
- taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
- }else if (taskInfo.getTaskStatus()[0].getRunState() ==
- TaskStatus.State.RUNNING) {
- UtilsForTests.waitFor(1000);
- taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
- }
- }
-
- verifyProcessTreeOverLimit(taskInfo,jobId);
- }
-
- /**
- * Increase the memory limit for reduce task and verify whether the
- * task manager logs the process tree status before killing or not.
- * @throws IOException - If an I/O error occurs.
- */
- @Test
- public void testStreamingJobProcTreeCleanOfReduceTask() throws
- IOException {
- String runtimeArgs [] = {
- "-D", "mapred.job.name=ProcTreeStreamJob",
- "-D", "mapred.reduce.tasks=2",
- "-D", "mapred.reduce.max.attempts=1",
- "-D", "mapred.cluster.max.reduce.memory.mb=2",
- "-D", "mapred.cluster.reduce.memory.mb=1",
- "-D","mapred.job.reduce.memory.mb=0.5"};
-
- String [] otherArgs = new String[] {
- "-input", inputDir.toString(),
- "-output", outputDir.toString(),
- "-reducer", "ProcessTree.sh"
- };
-
- JobID jobId = getJobId(runtimeArgs, otherArgs);
- Assert.assertNotNull("Job ID not found for 1 min", jobId);
- Assert.assertTrue("Job has not been started for 1 min.",
- cluster.getJTClient().isJobStarted(jobId));
- TaskInfo taskInfo = getTaskInfo(jobId);
- Assert.assertNotNull("TaskInfo is null",taskInfo);
- Assert.assertTrue("Task has not been started for 1 min.",
- cluster.getJTClient().isTaskStarted(taskInfo));
- JTProtocol wovenClient = cluster.getJTClient().getProxy();
- int counter = 0;
- while (counter++ < 60) {
- if (taskInfo.getTaskStatus().length == 0) {
- UtilsForTests.waitFor(1000);
- taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
- }else if (taskInfo.getTaskStatus()[0].getRunState() ==
- TaskStatus.State.RUNNING) {
- UtilsForTests.waitFor(1000);
- taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
- }
- }
- verifyProcessTreeOverLimit(taskInfo,jobId);
- }
-
- private void verifyProcessTreeOverLimit(TaskInfo taskInfo, JobID jobId)
- throws IOException {
- String taskOverLimitPatternString =
- "TaskTree \\[pid=[0-9]*,tipID=.*\\] is "
- + "running beyond memory-limits. "
- + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
- Pattern taskOverLimitPattern =
- Pattern.compile(String.format(taskOverLimitPatternString,
- String.valueOf(500 * 1024L)));
- LOG.info("Task OverLimit Pattern:" + taskOverLimitPattern);
- TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
- TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
- JobClient jobClient = cluster.getJTClient().getClient();
- RunningJob runJob = jobClient.getJob(jobId);
- String[] taskDiagnostics = runJob.getTaskDiagnostics(taskAttID);
- Assert.assertNotNull("Task diagnostics is null.", taskDiagnostics);
- for (String strVal : taskDiagnostics) {
- Matcher mat = taskOverLimitPattern.matcher(strVal);
- Assert.assertTrue("Taskover limit error message is not matched.",
- mat.find());
- }
- }
-
- private String[] buildArgs(String [] runtimeArgs, String[] otherArgs) {
- String shellFile = System.getProperty("user.dir") +
- "/src/test/system/scripts/ProcessTree.sh";
-
- String fileArgs[] = new String[] {"-files", shellFile };
- int size = fileArgs.length + runtimeArgs.length + otherArgs.length;
- String args[]= new String[size];
- int index = 0;
- for (String fileArg : fileArgs) {
- args[index++] = fileArg;
- }
- for (String runtimeArg : runtimeArgs) {
- args[index++] = runtimeArg;
- }
- for (String otherArg : otherArgs) {
- args[index++] = otherArg;
- }
- return args;
- }
-
- private JobID getJobId(String [] runtimeArgs, String [] otherArgs)
- throws IOException {
- JobID jobId = null;
- final RunStreamJob runSJ;
- StreamJob streamJob = new StreamJob();
- int counter = 0;
- JTClient jtClient = cluster.getJTClient();
- JobClient jobClient = jtClient.getClient();
- int totalJobs = jobClient.getAllJobs().length;
- String [] args = buildArgs(runtimeArgs, otherArgs);
- cleanup(outputDir, conf);
- runSJ = new RunStreamJob(conf, streamJob, args);
- runSJ.start();
- while (counter++ < 60) {
- if (jobClient.getAllJobs().length - totalJobs == 0) {
- UtilsForTests.waitFor(1000);
- } else if (jobClient.getAllJobs()[0].getRunState() == JobStatus.RUNNING) {
- jobId = jobClient.getAllJobs()[0].getJobID();
- break;
- } else {
- UtilsForTests.waitFor(1000);
- }
- }
- return jobId;
- }
-
- private TaskInfo getTaskInfo(JobID jobId)
- throws IOException {
- JTProtocol wovenClient = cluster.getJTClient().getProxy();
- JobInfo jInfo = wovenClient.getJobInfo(jobId);
- JobStatus jobStatus = jInfo.getStatus();
- // Make sure that map is running and start progress 10%.
- while (jobStatus.mapProgress() < 0.1f) {
- UtilsForTests.waitFor(100);
- jobStatus = wovenClient.getJobInfo(jobId).getStatus();
- }
- TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
- for (TaskInfo taskinfo : taskInfos) {
- if (!taskinfo.isSetupOrCleanup()) {
- return taskinfo;
- }
- }
- return null;
- }
-
- private static void createInput(Path inDir, Configuration conf)
- throws IOException {
- FileSystem fs = inDir.getFileSystem(conf);
- if (!fs.mkdirs(inDir)) {
- throw new IOException("Failed to create the input directory:"
- + inDir.toString());
- }
- fs.setPermission(inDir, new FsPermission(FsAction.ALL,
- FsAction.ALL, FsAction.ALL));
- DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
- String input="Process tree cleanup of Streaming job tasks.";
- file.writeBytes(input + "\n");
- file.close();
- }
-
- private static void cleanup(Path dir, Configuration conf)
- throws IOException {
- FileSystem fs = dir.getFileSystem(conf);
- fs.delete(dir, true);
- }
-
- class RunStreamJob extends Thread {
- Configuration jobConf;
- Tool tool;
- String [] args;
- public RunStreamJob(Configuration jobConf, Tool tool,
- String [] args) {
- this.jobConf = jobConf;
- this.tool = tool;
- this.args = args;
- }
- public void run() {
- try {
- ToolRunner.run(jobConf, tool, args);
- } catch(InterruptedException iexp) {
- LOG.warn("Thread is interrupted:" + iexp.getMessage());
- } catch(Exception exp) {
- LOG.warn("Exception:" + exp.getMessage());
- }
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.streaming.StreamJob;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+/**
+ * Increase memory usage beyond the memory limits of streaming job and
+ * verify whether task manager logs the process tree status
+ * before killing or not.
+ */
+public class TestStreamingJobProcessTree {
+ private static final Log LOG = LogFactory
+ .getLog(TestStreamingJobProcessTree.class);
+ private static MRCluster cluster;
+ private static Configuration conf = new Configuration();
+ private static Path inputDir = new Path("input");
+ private static Path outputDir = new Path("output");
+
+ @BeforeClass
+ public static void before() throws Exception {
+ String [] excludeExpList = {"java.net.ConnectException",
+ "java.io.IOException"};
+ cluster = MRCluster.createCluster(conf);
+ cluster.setExcludeExpList(excludeExpList);
+ cluster.setUp();
+ conf = cluster.getJTClient().getProxy().getDaemonConf();
+ createInput(inputDir, conf);
+ }
+ @AfterClass
+ public static void after() throws Exception {
+ cleanup(inputDir, conf);
+ cleanup(outputDir, conf);
+ cluster.tearDown();
+ }
+
+ /**
+ * Increase the memory limit for map task and verify whether the
+ * task manager logs the process tree status before killing or not.
+ * @throws IOException - If an I/O error occurs.
+ */
+ @Test
+ public void testStreamingJobProcTreeCleanOfMapTask() throws
+ IOException {
+ String runtimeArgs [] = {
+ "-D", "mapred.job.name=ProcTreeStreamJob",
+ "-D", "mapred.map.tasks=1",
+ "-D", "mapred.reduce.tasks=0",
+ "-D", "mapred.map.max.attempts=1",
+ "-D", "mapred.cluster.max.map.memory.mb=2048",
+ "-D", "mapred.cluster.reduce.memory.mb=1024",
+ "-D", "mapred.cluster.max.reduce.memory.mb=2048",
+ "-D", "mapred.cluster.map.memory.mb=1024",
+ "-D", "mapred.job.map.memory.mb=512"
+ };
+
+ String [] otherArgs = new String[] {
+ "-input", inputDir.toString(),
+ "-output", outputDir.toString(),
+ "-mapper", "ProcessTree.sh",
+ };
+ JobID jobId = getJobId(runtimeArgs, otherArgs);
+ LOG.info("Job ID:" + jobId);
+ if (jobId == null) {
+ jobId = getJobId(runtimeArgs, otherArgs);
+ }
+ Assert.assertNotNull("Job ID not found for 1 min", jobId);
+ Assert.assertTrue("Job has not been started for 1 min.",
+ cluster.getJTClient().isJobStarted(jobId));
+ TaskInfo taskInfo = getTaskInfo(jobId, true);
+ Assert.assertNotNull("TaskInfo is null",taskInfo);
+ Assert.assertTrue("Task has not been started for 1 min.",
+ cluster.getJTClient().isTaskStarted(taskInfo));
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ int counter = 0;
+ TaskInfo tempTaskInfo;
+ while (counter++ < 60) {
+ if (taskInfo.getTaskStatus().length == 0) {
+ UtilsForTests.waitFor(1000);
+ tempTaskInfo = taskInfo;
+ taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+ }else if (taskInfo.getTaskStatus()[0].getRunState() ==
+ TaskStatus.State.RUNNING) {
+ UtilsForTests.waitFor(1000);
+ tempTaskInfo = taskInfo;
+ taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+ } else {
+ break;
+ }
+
+ if (taskInfo == null) {
+ taskInfo = tempTaskInfo;
+ break;
+ }
+ }
+
+ verifyProcessTreeOverLimit(taskInfo,jobId);
+ JobInfo jInfo = wovenClient.getJobInfo(jobId);
+ LOG.info("Waiting till the job is completed...");
+ counter = 0;
+ while (counter++ < 60) {
+ if (jInfo == null) {
+ break;
+ } else if (jInfo.getStatus().isJobComplete()) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ jInfo = wovenClient.getJobInfo(jobId);
+ }
+ UtilsForTests.waitFor(1000);
+ }
+
+ /**
+ * Increase the memory limit for reduce task and verify whether the
+ * task manager logs the process tree status before killing or not.
+ * @throws IOException - If an I/O error occurs.
+ */
+ @Test
+ public void testStreamingJobProcTreeCleanOfReduceTask() throws
+ IOException {
+ String runtimeArgs [] = {
+ "-D", "mapred.job.name=ProcTreeStreamJob",
+ "-D", "mapred.reduce.tasks=1",
+ "-D", "mapred.map.tasks=1",
+ "-D", "mapred.reduce.max.attempts=1",
+ "-D", "mapred.cluster.max.map.memory.mb=2048",
+ "-D", "mapred.cluster.map.memory.mb=1024",
+ "-D", "mapred.cluster.max.reduce.memory.mb=20248",
+ "-D", "mapred.cluster.reduce.memory.mb=1024",
+ "-D", "mapred.job.reduce.memory.mb=512"};
+
+ String [] otherArgs = new String[] {
+ "-input", inputDir.toString(),
+ "-output", outputDir.toString(),
+ "-mapper", "/bin/cat",
+ "-reducer", "ProcessTree.sh"
+ };
+
+ cleanup(outputDir, conf);
+ JobID jobId = getJobId(runtimeArgs, otherArgs);
+ if (jobId == null) {
+ jobId = getJobId(runtimeArgs, otherArgs);
+ }
+ Assert.assertNotNull("Job ID not found for 1 min", jobId);
+ Assert.assertTrue("Job has not been started for 1 min.",
+ cluster.getJTClient().isJobStarted(jobId));
+ TaskInfo taskInfo = getTaskInfo(jobId, false);
+ Assert.assertNotNull("TaskInfo is null",taskInfo);
+ Assert.assertTrue("Task has not been started for 1 min.",
+ cluster.getJTClient().isTaskStarted(taskInfo));
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ int counter = 0;
+ TaskInfo tempTaskInfo;
+ while (counter++ < 60) {
+ if (taskInfo.getTaskStatus().length == 0) {
+ UtilsForTests.waitFor(1000);
+ tempTaskInfo = taskInfo;
+ taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+ }else if (taskInfo.getTaskStatus()[0].getRunState() ==
+ TaskStatus.State.RUNNING) {
+ UtilsForTests.waitFor(1000);
+ tempTaskInfo = taskInfo;
+ taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+ } else {
+ break;
+ }
+ if (taskInfo == null) {
+ taskInfo = tempTaskInfo;
+ break;
+ }
+ }
+ verifyProcessTreeOverLimit(taskInfo,jobId);
+ JobInfo jInfo = wovenClient.getJobInfo(jobId);
+ LOG.info("Waiting till the job is completed...");
+ counter = 0;
+ while (counter++ < 60) {
+ if(jInfo == null) {
+ break;
+ } else if (jInfo.getStatus().isJobComplete()) {
+ break;
+ }
+ UtilsForTests.waitFor(1000);
+ jInfo = wovenClient.getJobInfo(jobId);
+ }
+ }
+
+ private void verifyProcessTreeOverLimit(TaskInfo taskInfo, JobID jobId)
+ throws IOException {
+ String taskOverLimitPatternString =
+ "TaskTree \\[pid=[0-9]*,tipID=.*\\] is "
+ + "running beyond memory-limits. "
+ + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
+ Pattern taskOverLimitPattern =
+ Pattern.compile(String.format(taskOverLimitPatternString,
+ String.valueOf(512 * 1024 * 1024L)));
+ LOG.info("Task OverLimit Pattern:" + taskOverLimitPattern);
+ TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
+ TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+ JobClient jobClient = cluster.getJTClient().getClient();
+ RunningJob runJob = jobClient.getJob(jobId);
+ String[] taskDiagnostics = runJob.getTaskDiagnostics(taskAttID);
+ Assert.assertNotNull("Task diagnostics is null.", taskDiagnostics);
+ for (String strVal : taskDiagnostics) {
+ Matcher mat = taskOverLimitPattern.matcher(strVal);
+ Assert.assertTrue("Taskover limit error message is not matched.",
+ mat.find());
+ }
+ }
+
+ private String[] buildArgs(String [] runtimeArgs, String[] otherArgs) {
+ String shellFile = System.getProperty("user.dir") +
+ "/src/test/system/scripts/ProcessTree.sh";
+
+ String fileArgs[] = new String[] {"-files", shellFile };
+ int size = fileArgs.length + runtimeArgs.length + otherArgs.length;
+ String args[]= new String[size];
+ int index = 0;
+ for (String fileArg : fileArgs) {
+ args[index++] = fileArg;
+ }
+ for (String runtimeArg : runtimeArgs) {
+ args[index++] = runtimeArg;
+ }
+ for (String otherArg : otherArgs) {
+ args[index++] = otherArg;
+ }
+ return args;
+ }
+
+ private JobID getJobId(String [] runtimeArgs, String [] otherArgs)
+ throws IOException {
+ JobID jobId = null;
+ final RunStreamJob runSJ;
+ StreamJob streamJob = new StreamJob();
+ int counter = 0;
+ JTClient jtClient = cluster.getJTClient();
+ JobClient jobClient = jtClient.getClient();
+ int totalJobs = jobClient.getAllJobs().length;
+ String [] args = buildArgs(runtimeArgs, otherArgs);
+ cleanup(outputDir, conf);
+ conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
+ runSJ = new RunStreamJob(conf, streamJob, args);
+ runSJ.start();
+ while (counter++ < 60) {
+ if (jobClient.getAllJobs().length - totalJobs == 0) {
+ UtilsForTests.waitFor(1000);
+ } else if (jobClient.getAllJobs()[0].getRunState() == JobStatus.RUNNING) {
+ jobId = jobClient.getAllJobs()[0].getJobID();
+ break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ }
+ }
+ return jobId;
+ }
+
+ private TaskInfo getTaskInfo(JobID jobId, boolean isMap)
+ throws IOException {
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ JobInfo jInfo = wovenClient.getJobInfo(jobId);
+ TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ if (taskinfo.getTaskID().isMap() == isMap) {
+ return taskinfo;
+ }
+ }
+ }
+ return null;
+ }
+
+ private static void createInput(Path inDir, Configuration conf)
+ throws IOException {
+ FileSystem fs = inDir.getFileSystem(conf);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Failed to create the input directory:"
+ + inDir.toString());
+ }
+ fs.setPermission(inDir, new FsPermission(FsAction.ALL,
+ FsAction.ALL, FsAction.ALL));
+ DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+ String input="Process tree cleanup of Streaming job tasks.";
+ file.writeBytes(input + "\n");
+ file.close();
+ }
+
+ private static void cleanup(Path dir, Configuration conf)
+ throws IOException {
+ FileSystem fs = dir.getFileSystem(conf);
+ fs.delete(dir, true);
+ }
+
+ class RunStreamJob extends Thread {
+ Configuration jobConf;
+ Tool tool;
+ String [] args;
+ public RunStreamJob(Configuration jobConf, Tool tool,
+ String [] args) {
+ this.jobConf = jobConf;
+ this.tool = tool;
+ this.args = args;
+ }
+ public void run() {
+ try {
+ ToolRunner.run(jobConf, tool, args);
+ } catch(InterruptedException iexp) {
+ LOG.warn("Thread is interrupted:" + iexp.getMessage());
+ } catch(Exception exp) {
+ LOG.warn("Exception:" + exp.getMessage());
+ }
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java Fri Mar 4 04:39:22 2011
@@ -217,6 +217,7 @@ public class TestTaskKillingOfStreamingJ
int totalJobs = client.getAllJobs().length;
String [] streamingArgs = generateArgs(runtimeArgs);
cleanup(outputDir, conf);
+ conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
final RunStreamingJob streamJobThread = new RunStreamingJob(conf,
streamJob,streamingArgs);
streamJobThread.start();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java Fri Mar 4 04:39:22 2011
@@ -33,6 +33,7 @@ public class HighRamJobHelper {
String jobArgs []= {"-D","mapred.cluster.max.map.memory.mb=2048",
"-D","mapred.cluster.max.reduce.memory.mb=2048",
"-D","mapred.cluster.map.memory.mb=1024",
+ "-D","mapreduce.job.complete.cancel.delegation.tokens=false",
"-D","mapred.cluster.reduce.memory.mb=1024",
"-m", "6",
"-r", "2",
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java Fri Mar 4 04:39:22 2011
@@ -1,283 +1,283 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.test.system.JobInfo;
-import org.apache.hadoop.mapreduce.test.system.MRCluster;
-import org.apache.hadoop.mapreduce.test.system.JTProtocol;
-import org.apache.hadoop.mapreduce.test.system.JTClient;
-import org.apache.hadoop.mapreduce.test.system.TTClient;
-import org.apache.hadoop.mapreduce.test.system.TaskInfo;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import java.net.URI;
-import java.io.DataOutputStream;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Assert;
-import org.junit.Test;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Verify the job cache files localizations.
- */
-public class TestCacheFileReferenceCount {
- private static final Log LOG = LogFactory
- .getLog(TestCacheFileReferenceCount.class);
- private static Configuration conf = new Configuration();
- private static MRCluster cluster;
- private static Path tmpFolderPath = null;
- private static String cacheFile1 = "cache1.txt";
- private static String cacheFile2 = "cache2.txt";
- private static String cacheFile3 = "cache3.txt";
- private static String cacheFile4 = "cache4.txt";
- private static JTProtocol wovenClient = null;
- private static JobClient jobClient = null;
- private static JTClient jtClient = null;
- private static URI cacheFileURI1;
- private static URI cacheFileURI2;
- private static URI cacheFileURI3;
- private static URI cacheFileURI4;
-
- @BeforeClass
- public static void before() throws Exception {
- cluster = MRCluster.createCluster(conf);
- cluster.setUp();
- tmpFolderPath = new Path("hdfs:///tmp");
- jtClient = cluster.getJTClient();
- jobClient = jtClient.getClient();
- wovenClient = cluster.getJTClient().getProxy();
- cacheFileURI1 = createCacheFile(tmpFolderPath, cacheFile1);
- cacheFileURI2 = createCacheFile(tmpFolderPath, cacheFile2);
- }
-
- @AfterClass
- public static void after() throws Exception {
- deleteCacheFile(new Path(tmpFolderPath, cacheFile1));
- deleteCacheFile(new Path(tmpFolderPath, cacheFile2));
- deleteCacheFile(new Path(tmpFolderPath, cacheFile4));
- cluster.tearDown();
- }
-
- /**
- * Run the job with two distributed cache files and verify
- * whether job is succeeded or not.
- * @throws Exception
- */
- @Test
- public void testCacheFilesLocalization() throws Exception {
- conf = wovenClient.getDaemonConf();
- SleepJob job = new SleepJob();
- job.setConf(conf);
- JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
- DistributedCache.createSymlink(jobConf);
- DistributedCache.addCacheFile(cacheFileURI1, jobConf);
- DistributedCache.addCacheFile(cacheFileURI2, jobConf);
- RunningJob runJob = jobClient.submitJob(jobConf);
- JobID jobId = runJob.getID();
-
- Assert.assertTrue("Job has not been started for 1 min.",
- jtClient.isJobStarted(jobId));
- TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
- Assert.assertTrue("Cache File1 has not been localize",
- checkLocalization(taskInfos,cacheFile1));
- Assert.assertTrue("Cache File2 has not been localize",
- checkLocalization(taskInfos,cacheFile2));
- JobInfo jInfo = wovenClient.getJobInfo(jobId);
- LOG.info("Waiting till the job is completed...");
- while (!jInfo.getStatus().isJobComplete()) {
- UtilsForTests.waitFor(100);
- jInfo = wovenClient.getJobInfo(jobId);
- }
- Assert.assertEquals("Job has not been succeeded",
- jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
- }
-
- /**
- * Run the job with distributed cache files and remove one cache
- * file from the DFS when it is localized.verify whether the job
- * is failed or not.
- * @throws Exception
- */
- @Test
- public void testDeleteCacheFileInDFSAfterLocalized() throws Exception {
- conf = wovenClient.getDaemonConf();
- SleepJob job = new SleepJob();
- job.setConf(conf);
- JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
- cacheFileURI3 = createCacheFile(tmpFolderPath, cacheFile3);
- DistributedCache.createSymlink(jobConf);
- DistributedCache.addCacheFile(cacheFileURI3, jobConf);
- RunningJob runJob = jobClient.submitJob(jobConf);
- JobID jobId = runJob.getID();
- Assert.assertTrue("Job has not been started for 1 min.",
- jtClient.isJobStarted(jobId));
- TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
- boolean iscacheFileLocalized = checkLocalization(taskInfos,cacheFile3);
- Assert.assertTrue("CacheFile has not been localized",
- iscacheFileLocalized);
- deleteCacheFile(new Path(tmpFolderPath, cacheFile3));
- JobInfo jInfo = wovenClient.getJobInfo(jobId);
- LOG.info("Waiting till the job is completed...");
- while (!jInfo.getStatus().isJobComplete()) {
- UtilsForTests.waitFor(100);
- jInfo = wovenClient.getJobInfo(jobId);
- }
- Assert.assertEquals("Job has not been failed",
- jInfo.getStatus().getRunState(), JobStatus.FAILED);
- }
-
- /**
- * Run the job with two distribute cache files and the size of
- * one file should be larger than local.cache.size.Verify
- * whether job is succeeded or not.
- * @throws Exception
- */
- @Test
- public void testCacheSizeExceeds() throws Exception {
- conf = wovenClient.getDaemonConf();
- SleepJob job = new SleepJob();
- String jobArgs []= {"-D","local.cache.size=1024",
- "-m", "4",
- "-r", "2",
- "-mt", "2000",
- "-rt", "2000",
- "-recordt","100"};
- JobConf jobConf = new JobConf(conf);
- cacheFileURI4 = createCacheFile(tmpFolderPath, cacheFile4);
- DistributedCache.createSymlink(jobConf);
- DistributedCache.addCacheFile(cacheFileURI4, jobConf);
- int countBeforeJS = jtClient.getClient().getAllJobs().length;
- JobID prvJobId = jtClient.getClient().getAllJobs()[0].getJobID();
- int exitCode = ToolRunner.run(jobConf,job,jobArgs);
- Assert.assertEquals("Exit Code:", 0, exitCode);
- int countAfterJS = jtClient.getClient().getAllJobs().length;
- int counter = 0;
- while (counter++ < 30 ) {
- if (countBeforeJS == countAfterJS) {
- UtilsForTests.waitFor(1000);
- countAfterJS = jtClient.getClient().getAllJobs().length;
- } else {
- break;
- }
- }
- JobID jobId = jtClient.getClient().getAllJobs()[0].getJobID();
- counter = 0;
- while (counter++ < 30) {
- if (jobId.toString().equals(prvJobId.toString())) {
- UtilsForTests.waitFor(1000);
- jobId = jtClient.getClient().getAllJobs()[0].getJobID();
- } else {
- break;
- }
- }
- JobInfo jInfo = wovenClient.getJobInfo(jobId);
- Assert.assertEquals("Job has not been succeeded",
- jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
- }
-
- private boolean checkLocalization(TaskInfo[] taskInfos, String cacheFile)
- throws Exception {
- boolean iscacheFileLocalized = false;
- for (TaskInfo taskinfo : taskInfos) {
- if (!taskinfo.isSetupOrCleanup()) {
- String[] taskTrackers = taskinfo.getTaskTrackers();
- List<TTClient> ttList = getTTClients(taskTrackers);
- for (TTClient ttClient : ttList) {
- iscacheFileLocalized = checkCacheFile(ttClient,cacheFile);
- if(iscacheFileLocalized) {
- return true;
- }
- }
- }
- }
- return false;
- }
-
- private List<TTClient> getTTClients(String[] taskTrackers)
- throws Exception {
- List<TTClient> ttClientList= new ArrayList<TTClient>();
- for (String taskTracker: taskTrackers) {
- taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
- TTClient ttClient = cluster.getTTClient(taskTracker);
- if (ttClient != null) {
- ttClientList.add(ttClient);
- }
- }
- return ttClientList;
- }
-
- private boolean checkCacheFile(TTClient ttClient, String cacheFile)
- throws IOException {
- String[] localDirs = ttClient.getMapredLocalDirs();
- for (String localDir : localDirs) {
- localDir = localDir + Path.SEPARATOR +
- TaskTracker.getPublicDistributedCacheDir();
- FileStatus[] fileStatuses = ttClient.listStatus(localDir,
- true, true);
- for (FileStatus fileStatus : fileStatuses) {
- Path path = fileStatus.getPath();
- if ((path.toString()).endsWith(cacheFile)) {
- return true;
- }
- }
- }
- return false;
- }
-
- private static void deleteCacheFile(Path cacheFilePath)
- throws IOException {
- FileSystem dfs = jobClient.getFs();
- dfs.delete(cacheFilePath, true);
- }
-
- private static URI createCacheFile(Path tmpFolderPath, String cacheFile)
- throws IOException {
- String input = "distribute cache content...";
- FileSystem dfs = jobClient.getFs();
- conf = wovenClient.getDaemonConf();
- FileSystem fs = tmpFolderPath.getFileSystem(conf);
- if (!fs.mkdirs(tmpFolderPath)) {
- throw new IOException("Failed to create the temp directory:"
- + tmpFolderPath.toString());
- }
- deleteCacheFile(new Path(tmpFolderPath, cacheFile));
- DataOutputStream file = fs.create(new Path(tmpFolderPath, cacheFile));
- int i = 0;
- while(i++ < 100) {
- file.writeBytes(input);
- }
- file.close();
- dfs.setPermission(tmpFolderPath, new FsPermission(FsAction.ALL,
- FsAction.ALL, FsAction.ALL));
- URI uri = URI.create(new Path(tmpFolderPath, cacheFile).toString());
- return uri;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import java.net.URI;
+import java.io.DataOutputStream;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Verify the job cache files localizations.
+ */
+public class TestCacheFileReferenceCount {
+ private static final Log LOG = LogFactory
+ .getLog(TestCacheFileReferenceCount.class);
+ private static Configuration conf = new Configuration();
+ private static MRCluster cluster;
+ private static Path tmpFolderPath = null;
+ private static String cacheFile1 = "cache1.txt";
+ private static String cacheFile2 = "cache2.txt";
+ private static String cacheFile3 = "cache3.txt";
+ private static String cacheFile4 = "cache4.txt";
+ private static JTProtocol wovenClient = null;
+ private static JobClient jobClient = null;
+ private static JTClient jtClient = null;
+ private static URI cacheFileURI1;
+ private static URI cacheFileURI2;
+ private static URI cacheFileURI3;
+ private static URI cacheFileURI4;
+
+ @BeforeClass
+ public static void before() throws Exception {
+ cluster = MRCluster.createCluster(conf);
+ cluster.setUp();
+ tmpFolderPath = new Path("hdfs:///tmp");
+ jtClient = cluster.getJTClient();
+ jobClient = jtClient.getClient();
+ wovenClient = cluster.getJTClient().getProxy();
+ cacheFileURI1 = createCacheFile(tmpFolderPath, cacheFile1);
+ cacheFileURI2 = createCacheFile(tmpFolderPath, cacheFile2);
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ deleteCacheFile(new Path(tmpFolderPath, cacheFile1));
+ deleteCacheFile(new Path(tmpFolderPath, cacheFile2));
+ deleteCacheFile(new Path(tmpFolderPath, cacheFile4));
+ cluster.tearDown();
+ }
+
+ /**
+ * Run the job with two distributed cache files and verify
+ * whether job is succeeded or not.
+ * @throws Exception
+ */
+ @Test
+ public void testCacheFilesLocalization() throws Exception {
+ conf = wovenClient.getDaemonConf();
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
+ DistributedCache.createSymlink(jobConf);
+ DistributedCache.addCacheFile(cacheFileURI1, jobConf);
+ DistributedCache.addCacheFile(cacheFileURI2, jobConf);
+ RunningJob runJob = jobClient.submitJob(jobConf);
+ JobID jobId = runJob.getID();
+
+ Assert.assertTrue("Job has not been started for 1 min.",
+ jtClient.isJobStarted(jobId));
+ TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
+ Assert.assertTrue("Cache File1 has not been localize",
+ checkLocalization(taskInfos,cacheFile1));
+ Assert.assertTrue("Cache File2 has not been localize",
+ checkLocalization(taskInfos,cacheFile2));
+ JobInfo jInfo = wovenClient.getJobInfo(jobId);
+ LOG.info("Waiting till the job is completed...");
+ while (!jInfo.getStatus().isJobComplete()) {
+ UtilsForTests.waitFor(100);
+ jInfo = wovenClient.getJobInfo(jobId);
+ }
+ Assert.assertEquals("Job has not been succeeded",
+ jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
+ }
+
+ /**
+ * Run the job with distributed cache files and remove one cache
+ * file from the DFS when it is localized.verify whether the job
+ * is failed or not.
+ * @throws Exception
+ */
+ @Test
+ public void testDeleteCacheFileInDFSAfterLocalized() throws Exception {
+ conf = wovenClient.getDaemonConf();
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
+ cacheFileURI3 = createCacheFile(tmpFolderPath, cacheFile3);
+ DistributedCache.createSymlink(jobConf);
+ DistributedCache.addCacheFile(cacheFileURI3, jobConf);
+ RunningJob runJob = jobClient.submitJob(jobConf);
+ JobID jobId = runJob.getID();
+ Assert.assertTrue("Job has not been started for 1 min.",
+ jtClient.isJobStarted(jobId));
+ TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
+ boolean iscacheFileLocalized = checkLocalization(taskInfos,cacheFile3);
+ Assert.assertTrue("CacheFile has not been localized",
+ iscacheFileLocalized);
+ deleteCacheFile(new Path(tmpFolderPath, cacheFile3));
+ JobInfo jInfo = wovenClient.getJobInfo(jobId);
+ LOG.info("Waiting till the job is completed...");
+ while (!jInfo.getStatus().isJobComplete()) {
+ UtilsForTests.waitFor(100);
+ jInfo = wovenClient.getJobInfo(jobId);
+ }
+ Assert.assertEquals("Job has not been failed",
+ jInfo.getStatus().getRunState(), JobStatus.FAILED);
+ }
+
+ /**
+ * Run the job with two distribute cache files and the size of
+ * one file should be larger than local.cache.size.Verify
+ * whether job is succeeded or not.
+ * @throws Exception
+ */
+ @Test
+ public void testCacheSizeExceeds() throws Exception {
+ conf = wovenClient.getDaemonConf();
+ SleepJob job = new SleepJob();
+ String jobArgs []= {"-D","local.cache.size=1024",
+ "-m", "4",
+ "-r", "2",
+ "-mt", "2000",
+ "-rt", "2000",
+ "-recordt","100"};
+ JobConf jobConf = new JobConf(conf);
+ cacheFileURI4 = createCacheFile(tmpFolderPath, cacheFile4);
+ DistributedCache.createSymlink(jobConf);
+ DistributedCache.addCacheFile(cacheFileURI4, jobConf);
+ int countBeforeJS = jtClient.getClient().getAllJobs().length;
+ JobID prvJobId = jtClient.getClient().getAllJobs()[0].getJobID();
+ int exitCode = ToolRunner.run(jobConf,job,jobArgs);
+ Assert.assertEquals("Exit Code:", 0, exitCode);
+ int countAfterJS = jtClient.getClient().getAllJobs().length;
+ int counter = 0;
+ while (counter++ < 30 ) {
+ if (countBeforeJS == countAfterJS) {
+ UtilsForTests.waitFor(1000);
+ countAfterJS = jtClient.getClient().getAllJobs().length;
+ } else {
+ break;
+ }
+ }
+ JobID jobId = jtClient.getClient().getAllJobs()[0].getJobID();
+ counter = 0;
+ while (counter++ < 30) {
+ if (jobId.toString().equals(prvJobId.toString())) {
+ UtilsForTests.waitFor(1000);
+ jobId = jtClient.getClient().getAllJobs()[0].getJobID();
+ } else {
+ break;
+ }
+ }
+ JobInfo jInfo = wovenClient.getJobInfo(jobId);
+ Assert.assertEquals("Job has not been succeeded",
+ jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
+ }
+
+ private boolean checkLocalization(TaskInfo[] taskInfos, String cacheFile)
+ throws Exception {
+ boolean iscacheFileLocalized = false;
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ String[] taskTrackers = taskinfo.getTaskTrackers();
+ List<TTClient> ttList = getTTClients(taskTrackers);
+ for (TTClient ttClient : ttList) {
+ iscacheFileLocalized = checkCacheFile(ttClient,cacheFile);
+ if(iscacheFileLocalized) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private List<TTClient> getTTClients(String[] taskTrackers)
+ throws Exception {
+ List<TTClient> ttClientList= new ArrayList<TTClient>();
+ for (String taskTracker: taskTrackers) {
+ taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+ TTClient ttClient = cluster.getTTClient(taskTracker);
+ if (ttClient != null) {
+ ttClientList.add(ttClient);
+ }
+ }
+ return ttClientList;
+ }
+
+ private boolean checkCacheFile(TTClient ttClient, String cacheFile)
+ throws IOException {
+ String[] localDirs = ttClient.getMapredLocalDirs();
+ for (String localDir : localDirs) {
+ localDir = localDir + Path.SEPARATOR +
+ TaskTracker.getPublicDistributedCacheDir();
+ FileStatus[] fileStatuses = ttClient.listStatus(localDir,
+ true, true);
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ if ((path.toString()).endsWith(cacheFile)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private static void deleteCacheFile(Path cacheFilePath)
+ throws IOException {
+ FileSystem dfs = jobClient.getFs();
+ dfs.delete(cacheFilePath, true);
+ }
+
+ private static URI createCacheFile(Path tmpFolderPath, String cacheFile)
+ throws IOException {
+ String input = "distribute cache content...";
+ FileSystem dfs = jobClient.getFs();
+ conf = wovenClient.getDaemonConf();
+ FileSystem fs = tmpFolderPath.getFileSystem(conf);
+ if (!fs.mkdirs(tmpFolderPath)) {
+ throw new IOException("Failed to create the temp directory:"
+ + tmpFolderPath.toString());
+ }
+ deleteCacheFile(new Path(tmpFolderPath, cacheFile));
+ DataOutputStream file = fs.create(new Path(tmpFolderPath, cacheFile));
+ int i = 0;
+ while(i++ < 100) {
+ file.writeBytes(input);
+ }
+ file.close();
+ dfs.setPermission(new Path(tmpFolderPath, cacheFile), new FsPermission(FsAction.ALL,
+ FsAction.ALL, FsAction.ALL));
+ URI uri = URI.create(new Path(tmpFolderPath, cacheFile).toString());
+ return uri;
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java Fri Mar 4 04:39:22 2011
@@ -250,7 +250,13 @@ public class TestJobCacheDirectoriesClea
String localTaskDir = localDir + "/" +
TaskTracker.getLocalTaskDir(getUser(), jobId.toString(),
taskAttID.toString() + "/work/");
- if (ttClient.getFileStatus(localTaskDir,true).isDir()) {
+ boolean fstatus = false;
+ try {
+ fstatus = ttClient.getFileStatus(localTaskDir,true).isDir();
+ } catch(Exception exp) {
+ fstatus = false;
+ }
+ if (fstatus) {
ttClient.createFile(localTaskDir, customFile, permission, true);
ttClient.createFolder(localTaskDir, customFolder, permission, true);
list.add(localTaskDir + customFile);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java Fri Mar 4 04:39:22 2011
@@ -89,6 +89,7 @@ public class TestJobSummary {
public void testJobSummaryInfoOfKilledJob() throws IOException,
InterruptedException {
SleepJob job = new SleepJob();
+ conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
job.setConf(conf);
conf = job.setupJobConf(2, 1, 4000, 4000, 100, 100);
JobConf jobConf = new JobConf(conf);
@@ -113,6 +114,7 @@ public class TestJobSummary {
public void testJobSummaryInfoOfFailedJob() throws IOException,
InterruptedException {
conf = remoteJTClient.getDaemonConf();
+ conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
JobConf jobConf = new JobConf(conf);
jobConf.setJobName("Fail Job");
jobConf.setJarByClass(GenerateTaskChildProcess.class);
@@ -147,6 +149,7 @@ public class TestJobSummary {
SleepJob job = new SleepJob();
job.setConf(conf);
conf = job.setupJobConf(2, 1, 4000, 4000, 100, 100);
+ conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
JobConf jobConf = new JobConf(conf);
JobQueueInfo [] queues = jobClient.getQueues();
for (JobQueueInfo queueInfo : queues ){