You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sa...@apache.org on 2014/01/17 18:43:00 UTC
svn commit: r1559201 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/
hadoop-mapr...
Author: sandy
Date: Fri Jan 17 17:43:00 2014
New Revision: 1559201
URL: http://svn.apache.org/r1559201
Log:
MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params is specified (Gera Shegalov via Sandy Ryza)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1559201&r1=1559200&r2=1559201&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Jan 17 17:43:00 2014
@@ -334,6 +334,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus.
(Chuan Liu via cnauroth)
+ MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params
+ is specified (Gera Shegalov via Sandy Ryza)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1559201&r1=1559200&r2=1559201&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Fri Jan 17 17:43:00 2014
@@ -212,19 +212,11 @@ public class MapReduceChildJVM {
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(task.isMapTask()
).isIncluded(task.getPartition())) {
- vargs.add(
- String.format(
- conf.getProfileParams(),
- getTaskLogFile(TaskLog.LogName.PROFILE)
- )
- );
- if (task.isMapTask()) {
- vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
- }
- else {
- vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
- }
-
+ final String profileParams = conf.get(task.isMapTask()
+ ? MRJobConfig.TASK_MAP_PROFILE_PARAMS
+ : MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, conf.getProfileParams());
+ vargs.add(String.format(profileParams,
+ getTaskLogFile(TaskLog.LogName.PROFILE)));
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1559201&r1=1559200&r2=1559201&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Jan 17 17:43:00 2014
@@ -603,6 +603,31 @@
</property>
<property>
+ <name>mapreduce.task.profile.params</name>
+ <value></value>
+ <description>JVM profiler parameters used to profile map and reduce task
+ attempts. This string may contain a single format specifier %s that will
+ be replaced by the path to profile.out in the task attempt log directory.
+ To specify different profiling options for map tasks and reduce tasks,
+ more specific parameters mapreduce.task.profile.map.params and
+ mapreduce.task.profile.reduce.params should be used.</description>
+ </property>
+
+ <property>
+ <name>mapreduce.task.profile.map.params</name>
+ <value>${mapreduce.task.profile.params}</value>
+ <description>Map-task-specific JVM profiler parameters. See
+ mapreduce.task.profile.params</description>
+ </property>
+
+ <property>
+ <name>mapreduce.task.profile.reduce.params</name>
+ <value>${mapreduce.task.profile.params}</value>
+ <description>Reduce-task-specific JVM profiler parameters. See
+ mapreduce.task.profile.params</description>
+ </property>
+
+ <property>
<name>mapreduce.task.skip.start.attempts</name>
<value>2</value>
<description> The number of Task attempts AFTER which skip mode
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java?rev=1559201&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java Fri Jan 17 17:43:00 2014
@@ -0,0 +1,244 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.*;
+import java.util.*;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.SleepJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMRJobsWithProfiler {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestMRJobsWithProfiler.class);
+
+ private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
+ EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
+
+ private static MiniMRYarnCluster mrCluster;
+
+ private static final Configuration CONF = new Configuration();
+ private static final FileSystem localFs;
+ static {
+ try {
+ localFs = FileSystem.getLocal(CONF);
+ } catch (IOException io) {
+ throw new RuntimeException("problem getting local fs", io);
+ }
+ }
+
+ private static final Path TEST_ROOT_DIR =
+ new Path("target", TestMRJobs.class.getName() + "-tmpDir").
+ makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+
+ private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+
+ @Before
+ public void setup() throws InterruptedException, IOException {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ if (mrCluster == null) {
+ mrCluster = new MiniMRYarnCluster(getClass().getName());
+ mrCluster.init(CONF);
+ mrCluster.start();
+ }
+
+ // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
+ // workaround the absent public discache.
+ localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+ localFs.setPermission(APP_JAR, new FsPermission("700"));
+ }
+
+ @After
+ public void tearDown() {
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ if (mrCluster != null) {
+ mrCluster.stop();
+ }
+ }
+
+
+ @Test (timeout = 120000)
+ public void testProfiler() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ final SleepJob sleepJob = new SleepJob();
+ final JobConf sleepConf = new JobConf(mrCluster.getConfig());
+
+ sleepConf.setProfileEnabled(true);
+ // profile map split 1
+ sleepConf.setProfileTaskRange(true, "1");
+ // profile reduce of map output partitions 1
+ sleepConf.setProfileTaskRange(false, "1");
+
+ // use hprof for map to profile.out
+ sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
+ "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
+ + "file=%s");
+
+ // use Xprof for reduce to stdout
+ sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
+ sleepJob.setConf(sleepConf);
+
+ // 2-map-2-reduce SleepJob
+ final Job job = sleepJob.createJob(2, 2, 500, 1, 500, 1);
+ job.setJarByClass(SleepJob.class);
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+ job.waitForCompletion(true);
+ final JobId jobId = TypeConverter.toYarn(job.getJobID());
+ final ApplicationId appID = jobId.getAppId();
+ int pollElapsed = 0;
+ while (true) {
+ Thread.sleep(1000);
+ pollElapsed += 1000;
+
+ if (TERMINAL_RM_APP_STATES.contains(
+ mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
+ .getState())) {
+ break;
+ }
+
+ if (pollElapsed >= 60000) {
+ LOG.warn("application did not reach terminal state within 60 seconds");
+ break;
+ }
+ }
+ Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
+ .getRMContext().getRMApps().get(appID).getState());
+
+ // Job finished, verify logs
+ //
+ final Configuration nmConf = mrCluster.getNodeManager(0).getConfig();
+
+ final String appIdStr = appID.toString();
+ final String appIdSuffix = appIdStr.substring(
+ "application_".length(), appIdStr.length());
+ final String containerGlob = "container_" + appIdSuffix + "_*_*";
+
+ final Map<TaskAttemptID,Path> taLogDirs = new HashMap<TaskAttemptID,Path>();
+ final Pattern taskPattern = Pattern.compile(
+ ".*Task:(attempt_"
+ + appIdSuffix + "_[rm]_" + "[0-9]+_[0-9]+).*");
+ for (String logDir :
+ nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS))
+ {
+ // filter out MRAppMaster and create attemptId->logDir map
+ //
+ for (FileStatus fileStatus :
+ localFs.globStatus(new Path(logDir
+ + Path.SEPARATOR + appIdStr
+ + Path.SEPARATOR + containerGlob
+ + Path.SEPARATOR + TaskLog.LogName.SYSLOG)))
+ {
+ final BufferedReader br = new BufferedReader(
+ new InputStreamReader(localFs.open(fileStatus.getPath())));
+ String line;
+ while ((line = br.readLine()) != null) {
+ final Matcher m = taskPattern.matcher(line);
+ if (m.matches()) {
+ // found Task done message
+ taLogDirs.put(TaskAttemptID.forName(m.group(1)),
+ fileStatus.getPath().getParent());
+ break;
+ }
+ }
+ br.close();
+ }
+ }
+
+ Assert.assertEquals(4, taLogDirs.size()); // all 4 attempts found
+
+ for (Map.Entry<TaskAttemptID,Path> dirEntry : taLogDirs.entrySet()) {
+ final TaskAttemptID tid = dirEntry.getKey();
+ final Path profilePath = new Path(dirEntry.getValue(),
+ TaskLog.LogName.PROFILE.toString());
+ final Path stdoutPath = new Path(dirEntry.getValue(),
+ TaskLog.LogName.STDOUT.toString());
+ if (tid.getTaskType() == TaskType.MAP) {
+ if (tid.getTaskID().getId() == 1) {
+ // verify profile.out
+ final BufferedReader br = new BufferedReader(new InputStreamReader(
+ localFs.open(profilePath)));
+ final String line = br.readLine();
+ Assert.assertTrue("No hprof content found!",
+ line !=null && line.startsWith("JAVA PROFILE"));
+ br.close();
+ Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
+ } else {
+ Assert.assertFalse("hprof file should not exist",
+ localFs.exists(profilePath));
+ }
+ } else {
+ Assert.assertFalse("hprof file should not exist",
+ localFs.exists(profilePath));
+ if (tid.getTaskID().getId() == 1) {
+ final BufferedReader br = new BufferedReader(new InputStreamReader(
+ localFs.open(stdoutPath)));
+ boolean flatProfFound = false;
+ String line;
+ while ((line = br.readLine()) != null) {
+ if (line.startsWith("Flat profile")) {
+ flatProfFound = true;
+ break;
+ }
+ }
+ br.close();
+ Assert.assertTrue("Xprof flat profile not found!", flatProfFound);
+ } else {
+ Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
+ }
+ }
+ }
+ }
+}