You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sa...@apache.org on 2014/01/17 18:43:00 UTC

svn commit: r1559201 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/ hadoop-mapr...

Author: sandy
Date: Fri Jan 17 17:43:00 2014
New Revision: 1559201

URL: http://svn.apache.org/r1559201
Log:
MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params is specified (Gera Shegalov via Sandy Ryza)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1559201&r1=1559200&r2=1559201&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Jan 17 17:43:00 2014
@@ -334,6 +334,9 @@ Release 2.3.0 - UNRELEASED
     MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus.
     (Chuan Liu via cnauroth)
 
+    MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params
+    is specified (Gera Shegalov via Sandy Ryza)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1559201&r1=1559200&r2=1559201&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Fri Jan 17 17:43:00 2014
@@ -212,19 +212,11 @@ public class MapReduceChildJVM {
     if (conf.getProfileEnabled()) {
       if (conf.getProfileTaskRange(task.isMapTask()
                                    ).isIncluded(task.getPartition())) {
-        vargs.add(
-            String.format(
-                conf.getProfileParams(), 
-                getTaskLogFile(TaskLog.LogName.PROFILE)
-                )
-            );
-        if (task.isMapTask()) {
-          vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
-        }
-        else {
-          vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
-        }
-        
+        final String profileParams = conf.get(task.isMapTask()
+            ? MRJobConfig.TASK_MAP_PROFILE_PARAMS
+            : MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, conf.getProfileParams());
+        vargs.add(String.format(profileParams,
+            getTaskLogFile(TaskLog.LogName.PROFILE)));
       }
     }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1559201&r1=1559200&r2=1559201&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Jan 17 17:43:00 2014
@@ -603,6 +603,31 @@
   </property>
 
   <property>
+    <name>mapreduce.task.profile.params</name>
+    <value></value>
+    <description>JVM profiler parameters used to profile map and reduce task
+      attempts. This string may contain a single format specifier %s that will
+      be replaced by the path to profile.out in the task attempt log directory.
+      To specify different profiling options for map tasks and reduce tasks,
+      more specific parameters mapreduce.task.profile.map.params and
+      mapreduce.task.profile.reduce.params should be used.</description>
+  </property>
+
+  <property>
+    <name>mapreduce.task.profile.map.params</name>
+    <value>${mapreduce.task.profile.params}</value>
+    <description>Map-task-specific JVM profiler parameters. See
+      mapreduce.task.profile.params</description>
+  </property>
+
+  <property>
+    <name>mapreduce.task.profile.reduce.params</name>
+    <value>${mapreduce.task.profile.params}</value>
+    <description>Reduce-task-specific JVM profiler parameters. See
+      mapreduce.task.profile.params</description>
+  </property>
+
+  <property>
     <name>mapreduce.task.skip.start.attempts</name>
     <value>2</value>
     <description> The number of Task attempts AFTER which skip mode 

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java?rev=1559201&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java Fri Jan 17 17:43:00 2014
@@ -0,0 +1,244 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.*;
+import java.util.*;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.SleepJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMRJobsWithProfiler {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestMRJobsWithProfiler.class);
+
+  private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
+    EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
+
+  private static MiniMRYarnCluster mrCluster;
+
+  private static final Configuration CONF = new Configuration();
+  private static final FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(CONF);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static final Path TEST_ROOT_DIR =
+    new Path("target",  TestMRJobs.class.getName() + "-tmpDir").
+      makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+
+  private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+
+  @Before
+  public void setup() throws InterruptedException, IOException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster == null) {
+      mrCluster = new MiniMRYarnCluster(getClass().getName());
+      mrCluster.init(CONF);
+      mrCluster.start();
+    }
+
+    // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
+    // workaround the absent public discache.
+    localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+    localFs.setPermission(APP_JAR, new FsPermission("700"));
+  }
+
+  @After
+  public void tearDown() {
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+          + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster != null) {
+      mrCluster.stop();
+    }
+  }
+
+
+  @Test (timeout = 120000)
+  public void testProfiler() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+        + " not found. Not running test.");
+      return;
+    }
+
+    final SleepJob sleepJob = new SleepJob();
+    final JobConf sleepConf = new JobConf(mrCluster.getConfig());
+
+    sleepConf.setProfileEnabled(true);
+    // profile map split 1
+    sleepConf.setProfileTaskRange(true, "1");
+    // profile reduce of map output partitions 1
+    sleepConf.setProfileTaskRange(false, "1");
+
+    // use hprof for map to profile.out
+    sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
+        "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
+      + "file=%s");
+
+    // use Xprof for reduce to stdout
+    sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
+    sleepJob.setConf(sleepConf);
+
+    // 2-map-2-reduce SleepJob
+    final Job job = sleepJob.createJob(2, 2, 500, 1, 500, 1);
+    job.setJarByClass(SleepJob.class);
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.waitForCompletion(true);
+    final JobId jobId = TypeConverter.toYarn(job.getJobID());
+    final ApplicationId appID = jobId.getAppId();
+    int pollElapsed = 0;
+    while (true) {
+      Thread.sleep(1000);
+      pollElapsed += 1000;
+
+      if (TERMINAL_RM_APP_STATES.contains(
+        mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
+          .getState())) {
+        break;
+      }
+
+      if (pollElapsed >= 60000) {
+        LOG.warn("application did not reach terminal state within 60 seconds");
+        break;
+      }
+    }
+    Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
+      .getRMContext().getRMApps().get(appID).getState());
+
+    // Job finished, verify logs
+    //
+    final Configuration nmConf = mrCluster.getNodeManager(0).getConfig();
+
+    final String appIdStr = appID.toString();
+    final String appIdSuffix = appIdStr.substring(
+      "application_".length(), appIdStr.length());
+    final String containerGlob = "container_" + appIdSuffix + "_*_*";
+
+    final Map<TaskAttemptID,Path> taLogDirs = new HashMap<TaskAttemptID,Path>();
+    final Pattern taskPattern = Pattern.compile(
+        ".*Task:(attempt_"
+      + appIdSuffix + "_[rm]_" + "[0-9]+_[0-9]+).*");
+    for (String logDir :
+         nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS))
+    {
+      // filter out MRAppMaster and create attemptId->logDir map
+      //
+      for (FileStatus fileStatus :
+          localFs.globStatus(new Path(logDir
+            + Path.SEPARATOR + appIdStr
+            + Path.SEPARATOR + containerGlob
+            + Path.SEPARATOR + TaskLog.LogName.SYSLOG)))
+      {
+        final BufferedReader br = new BufferedReader(
+          new InputStreamReader(localFs.open(fileStatus.getPath())));
+        String line;
+        while ((line = br.readLine()) != null) {
+          final Matcher m = taskPattern.matcher(line);
+          if (m.matches()) {
+            // found Task done message
+            taLogDirs.put(TaskAttemptID.forName(m.group(1)),
+              fileStatus.getPath().getParent());
+            break;
+          }
+        }
+        br.close();
+      }
+    }
+
+    Assert.assertEquals(4, taLogDirs.size());  // all 4 attempts found
+
+    for (Map.Entry<TaskAttemptID,Path> dirEntry : taLogDirs.entrySet()) {
+      final TaskAttemptID tid = dirEntry.getKey();
+      final Path profilePath = new Path(dirEntry.getValue(),
+        TaskLog.LogName.PROFILE.toString());
+      final Path stdoutPath = new Path(dirEntry.getValue(),
+        TaskLog.LogName.STDOUT.toString());
+      if (tid.getTaskType() == TaskType.MAP) {
+        if (tid.getTaskID().getId() == 1) {
+          // verify profile.out
+          final BufferedReader br = new BufferedReader(new InputStreamReader(
+            localFs.open(profilePath)));
+          final String line = br.readLine();
+          Assert.assertTrue("No hprof content found!",
+            line !=null && line.startsWith("JAVA PROFILE"));
+          br.close();
+          Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
+        } else {
+          Assert.assertFalse("hprof file should not exist",
+            localFs.exists(profilePath));
+        }
+      } else {
+        Assert.assertFalse("hprof file should not exist",
+          localFs.exists(profilePath));
+        if (tid.getTaskID().getId() == 1) {
+          final BufferedReader br = new BufferedReader(new InputStreamReader(
+            localFs.open(stdoutPath)));
+          boolean flatProfFound = false;
+          String line;
+          while ((line = br.readLine()) != null) {
+            if (line.startsWith("Flat profile")) {
+              flatProfFound = true;
+              break;
+            }
+          }
+          br.close();
+          Assert.assertTrue("Xprof flat profile not found!", flatProfFound);
+        } else {
+          Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
+        }
+      }
+    }
+  }
+}