You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:37:27 UTC
svn commit: r1077622 - in
/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system:
org/apache/hadoop/mapred/TestStreamingJobProcessTree.java
scripts/ProcessTree.sh
Author: omalley
Date: Fri Mar 4 04:37:27 2011
New Revision: 1077622
URL: http://svn.apache.org/viewvc?rev=1077622&view=rev
Log:
commit 2c397d379a2b3032473e7f9ef50d4f71b7a5278b
Author: Vinay Kumar Thota <vi...@yahoo-inc.com>
Date: Fri Jul 30 04:53:53 2010 +0000
MAPREDUCE:1963 from https://issues.apache.org/jira/secure/attachment/12450276/1963-ydist-security.patch
Added:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/scripts/ProcessTree.sh
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java?rev=1077622&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java Fri Mar 4 04:37:27 2011
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.streaming.StreamJob;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+/**
+ * Increase memory usage beyond the memory limits of streaming job and
+ * verify whether task manager logs the process tree status
+ * before killing or not.
+ */
+public class TestStreamingJobProcessTree {
+ private static final Log LOG = LogFactory
+ .getLog(TestStreamingJobProcessTree.class);
+ private static MRCluster cluster;
+ private static Configuration conf = new Configuration();
+ private static Path inputDir = new Path("input");
+ private static Path outputDir = new Path("output");
+
+ @BeforeClass
+ public static void before() throws Exception {
+ String [] excludeExpList = {"java.net.ConnectException",
+ "java.io.IOException"};
+ cluster = MRCluster.createCluster(conf);
+ cluster.setExcludeExpList(excludeExpList);
+ cluster.setUp();
+ conf = cluster.getJTClient().getProxy().getDaemonConf();
+ createInput(inputDir, conf);
+ }
+ @AfterClass
+ public static void after() throws Exception {
+ cleanup(inputDir, conf);
+ cleanup(outputDir, conf);
+ cluster.tearDown();
+ }
+
+ /**
+ * Increase the memory limit for map task and verify whether the
+ * task manager logs the process tree status before killing or not.
+ * @throws IOException - If an I/O error occurs.
+ */
+ @Test
+ public void testStreamingJobProcTreeCleanOfMapTask() throws
+ IOException {
+ String runtimeArgs [] = {
+ "-D", "mapred.job.name=ProcTreeStreamJob",
+ "-D", "mapred.map.tasks=1",
+ "-D", "mapred.reduce.tasks=0",
+ "-D", "mapred.map.max.attempts=1",
+ "-D", "mapred.cluster.max.map.memory.mb=2",
+ "-D", "mapred.cluster.map.memory.mb=1",
+ "-D", "mapred.job.map.memory.mb=0.5"
+ };
+
+ String [] otherArgs = new String[] {
+ "-input", inputDir.toString(),
+ "-output", outputDir.toString(),
+ "-mapper", "ProcessTree.sh",
+ };
+ JobID jobId = getJobId(runtimeArgs, otherArgs);
+ LOG.info("Job ID:" + jobId);
+ Assert.assertNotNull("Job ID not found for 1 min", jobId);
+ Assert.assertTrue("Job has not been started for 1 min.",
+ cluster.getJTClient().isJobStarted(jobId));
+ TaskInfo taskInfo = getTaskInfo(jobId);
+ Assert.assertNotNull("TaskInfo is null",taskInfo);
+ Assert.assertTrue("Task has not been started for 1 min.",
+ cluster.getJTClient().isTaskStarted(taskInfo));
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ int counter = 0;
+ while (counter++ < 60) {
+ if (taskInfo.getTaskStatus().length == 0) {
+ UtilsForTests.waitFor(1000);
+ taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+ }else if (taskInfo.getTaskStatus()[0].getRunState() ==
+ TaskStatus.State.RUNNING) {
+ UtilsForTests.waitFor(1000);
+ taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+ }
+ }
+
+ verifyProcessTreeOverLimit(taskInfo,jobId);
+ }
+
+ /**
+ * Increase the memory limit for reduce task and verify whether the
+ * task manager logs the process tree status before killing or not.
+ * @throws IOException - If an I/O error occurs.
+ */
+ @Test
+ public void testStreamingJobProcTreeCleanOfReduceTask() throws
+ IOException {
+ String runtimeArgs [] = {
+ "-D", "mapred.job.name=ProcTreeStreamJob",
+ "-D", "mapred.reduce.tasks=2",
+ "-D", "mapred.reduce.max.attempts=1",
+ "-D", "mapred.cluster.max.reduce.memory.mb=2",
+ "-D", "mapred.cluster.reduce.memory.mb=1",
+ "-D","mapred.job.reduce.memory.mb=0.5"};
+
+ String [] otherArgs = new String[] {
+ "-input", inputDir.toString(),
+ "-output", outputDir.toString(),
+ "-reducer", "ProcessTree.sh"
+ };
+
+ JobID jobId = getJobId(runtimeArgs, otherArgs);
+ Assert.assertNotNull("Job ID not found for 1 min", jobId);
+ Assert.assertTrue("Job has not been started for 1 min.",
+ cluster.getJTClient().isJobStarted(jobId));
+ TaskInfo taskInfo = getTaskInfo(jobId);
+ Assert.assertNotNull("TaskInfo is null",taskInfo);
+ Assert.assertTrue("Task has not been started for 1 min.",
+ cluster.getJTClient().isTaskStarted(taskInfo));
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ int counter = 0;
+ while (counter++ < 60) {
+ if (taskInfo.getTaskStatus().length == 0) {
+ UtilsForTests.waitFor(1000);
+ taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+ }else if (taskInfo.getTaskStatus()[0].getRunState() ==
+ TaskStatus.State.RUNNING) {
+ UtilsForTests.waitFor(1000);
+ taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+ }
+ }
+ verifyProcessTreeOverLimit(taskInfo,jobId);
+ }
+
+ private void verifyProcessTreeOverLimit(TaskInfo taskInfo, JobID jobId)
+ throws IOException {
+ String taskOverLimitPatternString =
+ "TaskTree \\[pid=[0-9]*,tipID=.*\\] is "
+ + "running beyond memory-limits. "
+ + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
+ Pattern taskOverLimitPattern =
+ Pattern.compile(String.format(taskOverLimitPatternString,
+ String.valueOf(500 * 1024L)));
+ LOG.info("Task OverLimit Pattern:" + taskOverLimitPattern);
+ TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
+ TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+ JobClient jobClient = cluster.getJTClient().getClient();
+ RunningJob runJob = jobClient.getJob(jobId);
+ String[] taskDiagnostics = runJob.getTaskDiagnostics(taskAttID);
+ Assert.assertNotNull("Task diagnostics is null.", taskDiagnostics);
+ for (String strVal : taskDiagnostics) {
+ Matcher mat = taskOverLimitPattern.matcher(strVal);
+ Assert.assertTrue("Taskover limit error message is not matched.",
+ mat.find());
+ }
+ }
+
+ private String[] buildArgs(String [] runtimeArgs, String[] otherArgs) {
+ String shellFile = System.getProperty("user.dir") +
+ "/src/test/system/scripts/ProcessTree.sh";
+
+ String fileArgs[] = new String[] {"-files", shellFile };
+ int size = fileArgs.length + runtimeArgs.length + otherArgs.length;
+ String args[]= new String[size];
+ int index = 0;
+ for (String fileArg : fileArgs) {
+ args[index++] = fileArg;
+ }
+ for (String runtimeArg : runtimeArgs) {
+ args[index++] = runtimeArg;
+ }
+ for (String otherArg : otherArgs) {
+ args[index++] = otherArg;
+ }
+ return args;
+ }
+
+ private JobID getJobId(String [] runtimeArgs, String [] otherArgs)
+ throws IOException {
+ JobID jobId = null;
+ final RunStreamJob runSJ;
+ StreamJob streamJob = new StreamJob();
+ int counter = 0;
+ JTClient jtClient = cluster.getJTClient();
+ JobClient jobClient = jtClient.getClient();
+ int totalJobs = jobClient.getAllJobs().length;
+ String [] args = buildArgs(runtimeArgs, otherArgs);
+ cleanup(outputDir, conf);
+ runSJ = new RunStreamJob(conf, streamJob, args);
+ runSJ.start();
+ while (counter++ < 60) {
+ if (jobClient.getAllJobs().length - totalJobs == 0) {
+ UtilsForTests.waitFor(1000);
+ } else if (jobClient.getAllJobs()[0].getRunState() == JobStatus.RUNNING) {
+ jobId = jobClient.getAllJobs()[0].getJobID();
+ break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ }
+ }
+ return jobId;
+ }
+
+ private TaskInfo getTaskInfo(JobID jobId)
+ throws IOException {
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ JobInfo jInfo = wovenClient.getJobInfo(jobId);
+ JobStatus jobStatus = jInfo.getStatus();
+ // Make sure that map is running and start progress 10%.
+ while (jobStatus.mapProgress() < 0.1f) {
+ UtilsForTests.waitFor(100);
+ jobStatus = wovenClient.getJobInfo(jobId).getStatus();
+ }
+ TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ return taskinfo;
+ }
+ }
+ return null;
+ }
+
+ private static void createInput(Path inDir, Configuration conf)
+ throws IOException {
+ FileSystem fs = inDir.getFileSystem(conf);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Failed to create the input directory:"
+ + inDir.toString());
+ }
+ fs.setPermission(inDir, new FsPermission(FsAction.ALL,
+ FsAction.ALL, FsAction.ALL));
+ DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+ String input="Process tree cleanup of Streaming job tasks.";
+ file.writeBytes(input + "\n");
+ file.close();
+ }
+
+ private static void cleanup(Path dir, Configuration conf)
+ throws IOException {
+ FileSystem fs = dir.getFileSystem(conf);
+ fs.delete(dir, true);
+ }
+
+ class RunStreamJob extends Thread {
+ Configuration jobConf;
+ Tool tool;
+ String [] args;
+ public RunStreamJob(Configuration jobConf, Tool tool,
+ String [] args) {
+ this.jobConf = jobConf;
+ this.tool = tool;
+ this.args = args;
+ }
+ public void run() {
+ try {
+ ToolRunner.run(jobConf, tool, args);
+ } catch(InterruptedException iexp) {
+ LOG.warn("Thread is interrupted:" + iexp.getMessage());
+ } catch(Exception exp) {
+ LOG.warn("Exception:" + exp.getMessage());
+ }
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/scripts/ProcessTree.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/scripts/ProcessTree.sh?rev=1077622&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/scripts/ProcessTree.sh (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/scripts/ProcessTree.sh Fri Mar 4 04:37:27 2011
@@ -0,0 +1,40 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+##############################################################
+## It creates the subprocess and keep increasing the memory ##
+##############################################################
+set -x
+if [ $# -eq 0 ]
+then
+ StrVal="Hadoop is framework for data intensive distributed applications. \
+Hadoop enables applications to work with thousands of nodes."
+ i=1
+else
+ StrVal=$1
+ i=$2
+fi
+
+if [ $i -lt 5 ]
+then
+ sh $0 "$StrVal-AppendingStr" `expr $i + 1`
+else
+ echo $StrVal
+ while [ 1 ]
+ do
+ sleep 5
+ done
+fi