You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2014/11/01 08:49:09 UTC

git commit: MAPREDUCE-6052. Supported overriding the default container-log4j.properties file per job. Contributed by Junping Du.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 260ab6d5f -> ed63b1164


MAPREDUCE-6052. Supported overriding the default container-log4j.properties file per job. Contributed by Junping Du.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ed63b116
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ed63b116
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ed63b116

Branch: refs/heads/trunk
Commit: ed63b116465290fdb0acdf89170025f47b307599
Parents: 260ab6d
Author: Zhijie Shen <zj...@apache.org>
Authored: Sat Nov 1 00:47:57 2014 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Sat Nov 1 00:47:57 2014 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../apache/hadoop/mapred/MapReduceChildJVM.java |   7 +-
 .../v2/app/job/impl/TestMapReduceChildJVM.java  |  30 +++++
 .../apache/hadoop/mapreduce/v2/util/MRApps.java |  20 +++-
 .../hadoop/mapreduce/JobSubmissionFiles.java    |   8 ++
 .../apache/hadoop/mapreduce/JobSubmitter.java   | 114 ++++++++++++++++++-
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   3 +
 .../org/apache/hadoop/mapred/YARNRunner.java    |   2 +-
 8 files changed, 178 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2c06aa3..deb2311 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -283,6 +283,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-6018. Added an MR specific config to enable emitting job history
     data to the timeline server. (Robert Kanter via zjshen)
 
+    MAPREDUCE-6052. Supported overriding the default container-log4j.properties
+    file per job. (Junping Du via zjshen)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
index 2d91b5d..c790c57 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapreduce.ID;
@@ -148,11 +149,11 @@ public class MapReduceChildJVM {
 
   private static void setupLog4jProperties(Task task,
       Vector<String> vargs,
-      long logSize) {
+      long logSize, Configuration conf) {
     String logLevel = getChildLogLevel(task.conf, task.isMapTask());
     int numBackups = task.conf.getInt(MRJobConfig.TASK_LOG_BACKUPS,
         MRJobConfig.DEFAULT_TASK_LOG_BACKUPS);
-    MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs);
+    MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);
   }
 
   public static List<String> getVMCommand(
@@ -208,7 +209,7 @@ public class MapReduceChildJVM {
 
     // Setup the log4j prop
     long logSize = TaskLog.getTaskLogLength(conf);
-    setupLog4jProperties(task, vargs, logSize);
+    setupLog4jProperties(task, vargs, logSize, conf);
 
     if (conf.getProfileEnabled()) {
       if (conf.getProfileTaskRange(task.isMapTask()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
index 8fdaded..8e146b9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
@@ -77,6 +77,36 @@ public class TestMapReduceChildJVM {
       app.cmdEnvironment.containsKey("HADOOP_CLIENT_OPTS"));
     Assert.assertEquals("", app.cmdEnvironment.get("HADOOP_CLIENT_OPTS"));
   }
+  
+  @Test (timeout = 30000)
+  public void testCommandLineWithLog4JConifg() throws Exception {
+
+    MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
+    String testLogPropertieFile = "test-log4j.properties";
+    String testLogPropertiePath = "../"+"test-log4j.properties";
+    conf.set(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, testLogPropertiePath);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    Assert.assertEquals(
+      "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
+      " -Djava.net.preferIPv4Stack=true" +
+      " -Dhadoop.metrics.log.level=WARN" +
+      "  -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
+      " -Dlog4j.configuration=" + testLogPropertieFile +
+      " -Dyarn.app.container.log.dir=<LOG_DIR>" +
+      " -Dyarn.app.container.log.filesize=0" +
+      " -Dhadoop.root.logger=INFO,CLA" +
+      " org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
+      " 54321" +
+      " attempt_0_0000_m_000000_0" +
+      " 0" +
+      " 1><LOG_DIR>/stdout" +
+      " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+  }
 
   private static final class MyMRApp extends MRApp {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
index 113b445..4484e6a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
@@ -599,10 +599,26 @@ public class MRApps extends Apps {
    * @param logSize See {@link ContainerLogAppender#setTotalLogFileSize(long)}
    * @param numBackups See {@link RollingFileAppender#setMaxBackupIndex(int)}
    * @param vargs the argument list to append to
+   * @param conf configuration of MR job
    */
   public static void addLog4jSystemProperties(
-      String logLevel, long logSize, int numBackups, List<String> vargs) {
-    vargs.add("-Dlog4j.configuration=container-log4j.properties");
+      String logLevel, long logSize, int numBackups, List<String> vargs, 
+      Configuration conf) {
+    String log4jPropertyFile =
+        conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
+    if (log4jPropertyFile.isEmpty()) {
+      vargs.add("-Dlog4j.configuration=container-log4j.properties");
+    } else {
+      URI log4jURI = null;
+      try {
+        log4jURI = new URI(log4jPropertyFile);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e);
+      }
+      Path log4jPath = new Path(log4jURI);
+      vargs.add("-Dlog4j.configuration="+log4jPath.getName());
+    }
+    
     vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
         ApplicationConstants.LOG_DIR_EXPANSION_VAR);
     vargs.add(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
index a4ea1d8..516e661 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
@@ -75,6 +75,14 @@ public class JobSubmissionFiles {
   public static Path getJobDistCacheFiles(Path jobSubmitDir) {
     return new Path(jobSubmitDir, "files");
   }
+  
+  /**
+   * Get the job distributed cache path for log4j properties.
+   * @param jobSubmitDir
+   */
+  public static Path getJobLog4jFile(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "log4j");
+  }
   /**
    * Get the job distributed cache archives path.
    * @param jobSubmitDir 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 59202ca..b76a734 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
@@ -262,15 +263,102 @@ class JobSubmitter {
       LOG.warn("No job jar file set.  User classes may not be found. "+
       "See Job or Job#setJar(String).");
     }
-
+    
+    addLog4jToDistributedCache(job, submitJobDir);
+    
     //  set the timestamps of the archives and files
     //  set the public/private visibility of the archives and files
     ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
-    // get DelegationToken for each cached file
+    // get DelegationToken for cached file
     ClientDistributedCacheManager.getDelegationTokens(conf, job
         .getCredentials());
   }
   
+  // copy user specified log4j.property file in local 
+  // to HDFS with putting on distributed cache and adding its parent directory 
+  // to classpath.
+  @SuppressWarnings("deprecation")
+  private void copyLog4jPropertyFile(Job job, Path submitJobDir,
+      short replication) throws IOException {
+    Configuration conf = job.getConfiguration();
+
+    String file = validateFilePath(
+        conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf);
+    LOG.debug("default FileSystem: " + jtFs.getUri());
+    FsPermission mapredSysPerms = 
+      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    if (!jtFs.exists(submitJobDir)) {
+      throw new IOException("Cannot find job submission directory! " 
+          + "It should just be created, so something wrong here.");
+    }
+    
+    Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir);
+
+    // first copy local log4j.properties file to HDFS under submitJobDir
+    if (file != null) {
+      FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms);
+      URI tmpURI = null;
+      try {
+        tmpURI = new URI(file);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e);
+      }
+      Path tmp = new Path(tmpURI);
+      Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
+      DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), conf);
+    }
+  }
+  
+  /**
+   * takes input as a path string for file and verifies if it exist. 
+   * It defaults for file:/// if the files specified do not have a scheme.
+   * it returns the paths uri converted defaulting to file:///.
+   * So an input of  /home/user/file1 would return file:///home/user/file1
+   * @param file
+   * @param conf
+   * @return
+   */
+  private String validateFilePath(String file, Configuration conf) 
+      throws IOException  {
+    if (file == null) {
+      return null;
+    }
+    if (file.isEmpty()) {
+      throw new IllegalArgumentException("File name can't be empty string");
+    }
+    String finalPath;
+    URI pathURI;
+    try {
+      pathURI = new URI(file);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(e);
+    }
+    Path path = new Path(pathURI);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    if (pathURI.getScheme() == null) {
+      //default to the local file system
+      //check if the file exists or not first
+      if (!localFs.exists(path)) {
+        throw new FileNotFoundException("File " + file + " does not exist.");
+      }
+      finalPath = path.makeQualified(localFs.getUri(),
+          localFs.getWorkingDirectory()).toString();
+    }
+    else {
+      // check if the file exists in this file system
+      // we need to recreate this filesystem object to copy
+      // these files to the file system ResourceManager is running
+      // on.
+      FileSystem fs = path.getFileSystem(conf);
+      if (!fs.exists(path)) {
+        throw new FileNotFoundException("File " + file + " does not exist.");
+      }
+      finalPath = path.makeQualified(fs.getUri(),
+          fs.getWorkingDirectory()).toString();
+    }
+    return finalPath;
+  }
+  
   private URI getPathURI(Path destPath, String fragment) 
       throws URISyntaxException {
     URI pathURI = destPath.toUri();
@@ -305,7 +393,7 @@ class JobSubmitter {
 
     // Set the working directory
     if (job.getWorkingDirectory() == null) {
-      job.setWorkingDirectory(jtFs.getWorkingDirectory());          
+      job.setWorkingDirectory(jtFs.getWorkingDirectory());
     }
 
   }
@@ -395,6 +483,10 @@ class JobSubmitter {
       }
 
       copyAndConfigureFiles(job, submitJobDir);
+      
+      
+
+      
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
       
       // Create the splits for the job
@@ -673,4 +765,20 @@ class JobSubmitter {
       DistributedCache.addCacheArchive(uri, conf);
     }
   }
+  
+  private void addLog4jToDistributedCache(Job job,
+      Path jobSubmitDir) throws IOException {
+    Configuration conf = job.getConfiguration();
+    String log4jPropertyFile =
+        conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
+    if (!log4jPropertyFile.isEmpty()) {
+      short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
+      copyLog4jPropertyFile(job, jobSubmitDir, replication);
+
+      // Set the working directory
+      if (job.getWorkingDirectory() == null) {
+        job.setWorkingDirectory(jtFs.getWorkingDirectory());
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 5476e92..691074a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -730,6 +730,9 @@ public interface MRJobConfig {
    */
   public static final String MAPREDUCE_APPLICATION_CLASSPATH = 
       "mapreduce.application.classpath";
+  
+  public static final String MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE = 
+      "mapreduce.job.log4j-properties-file";
 
   /**
    * Path to MapReduce framework archive

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 40ef982..a1c4c32 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -398,7 +398,7 @@ public class YARNRunner implements ClientProtocol {
         MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
     int numBackups = jobConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS,
         MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS);
-    MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs);
+    MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);
 
     // Check for Java Lib Path usage in MAP and REDUCE configs
     warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map",