You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2014/11/01 08:49:09 UTC
git commit: MAPREDUCE-6052. Supported overriding the default
container-log4j.properties file per job. Contributed by Junping Du.
Repository: hadoop
Updated Branches:
refs/heads/trunk 260ab6d5f -> ed63b1164
MAPREDUCE-6052. Supported overriding the default container-log4j.properties file per job. Contributed by Junping Du.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ed63b116
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ed63b116
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ed63b116
Branch: refs/heads/trunk
Commit: ed63b116465290fdb0acdf89170025f47b307599
Parents: 260ab6d
Author: Zhijie Shen <zj...@apache.org>
Authored: Sat Nov 1 00:47:57 2014 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Sat Nov 1 00:47:57 2014 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../apache/hadoop/mapred/MapReduceChildJVM.java | 7 +-
.../v2/app/job/impl/TestMapReduceChildJVM.java | 30 +++++
.../apache/hadoop/mapreduce/v2/util/MRApps.java | 20 +++-
.../hadoop/mapreduce/JobSubmissionFiles.java | 8 ++
.../apache/hadoop/mapreduce/JobSubmitter.java | 114 ++++++++++++++++++-
.../apache/hadoop/mapreduce/MRJobConfig.java | 3 +
.../org/apache/hadoop/mapred/YARNRunner.java | 2 +-
8 files changed, 178 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2c06aa3..deb2311 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -283,6 +283,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6018. Added an MR specific config to enable emitting job history
data to the timeline server. (Robert Kanter via zjshen)
+ MAPREDUCE-6052. Supported overriding the default container-log4j.properties
+ file per job. (Junping Du via zjshen)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
index 2d91b5d..c790c57 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Vector;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapreduce.ID;
@@ -148,11 +149,11 @@ public class MapReduceChildJVM {
private static void setupLog4jProperties(Task task,
Vector<String> vargs,
- long logSize) {
+ long logSize, Configuration conf) {
String logLevel = getChildLogLevel(task.conf, task.isMapTask());
int numBackups = task.conf.getInt(MRJobConfig.TASK_LOG_BACKUPS,
MRJobConfig.DEFAULT_TASK_LOG_BACKUPS);
- MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs);
+ MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);
}
public static List<String> getVMCommand(
@@ -208,7 +209,7 @@ public class MapReduceChildJVM {
// Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
- setupLog4jProperties(task, vargs, logSize);
+ setupLog4jProperties(task, vargs, logSize, conf);
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(task.isMapTask()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
index 8fdaded..8e146b9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
@@ -77,6 +77,36 @@ public class TestMapReduceChildJVM {
app.cmdEnvironment.containsKey("HADOOP_CLIENT_OPTS"));
Assert.assertEquals("", app.cmdEnvironment.get("HADOOP_CLIENT_OPTS"));
}
+
+ @Test (timeout = 30000)
+ public void testCommandLineWithLog4JConifg() throws Exception {
+
+ MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
+ String testLogPropertieFile = "test-log4j.properties";
+ String testLogPropertiePath = "../"+"test-log4j.properties";
+ conf.set(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, testLogPropertiePath);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+
+ Assert.assertEquals(
+ "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
+ " -Djava.net.preferIPv4Stack=true" +
+ " -Dhadoop.metrics.log.level=WARN" +
+ " -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
+ " -Dlog4j.configuration=" + testLogPropertieFile +
+ " -Dyarn.app.container.log.dir=<LOG_DIR>" +
+ " -Dyarn.app.container.log.filesize=0" +
+ " -Dhadoop.root.logger=INFO,CLA" +
+ " org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
+ " 54321" +
+ " attempt_0_0000_m_000000_0" +
+ " 0" +
+ " 1><LOG_DIR>/stdout" +
+ " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+ }
private static final class MyMRApp extends MRApp {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
index 113b445..4484e6a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
@@ -599,10 +599,26 @@ public class MRApps extends Apps {
* @param logSize See {@link ContainerLogAppender#setTotalLogFileSize(long)}
* @param numBackups See {@link RollingFileAppender#setMaxBackupIndex(int)}
* @param vargs the argument list to append to
+ * @param conf configuration of MR job
*/
public static void addLog4jSystemProperties(
- String logLevel, long logSize, int numBackups, List<String> vargs) {
- vargs.add("-Dlog4j.configuration=container-log4j.properties");
+ String logLevel, long logSize, int numBackups, List<String> vargs,
+ Configuration conf) {
+ String log4jPropertyFile =
+ conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
+ if (log4jPropertyFile.isEmpty()) {
+ vargs.add("-Dlog4j.configuration=container-log4j.properties");
+ } else {
+ URI log4jURI = null;
+ try {
+ log4jURI = new URI(log4jPropertyFile);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ Path log4jPath = new Path(log4jURI);
+ vargs.add("-Dlog4j.configuration="+log4jPath.getName());
+ }
+
vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
ApplicationConstants.LOG_DIR_EXPANSION_VAR);
vargs.add(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
index a4ea1d8..516e661 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
@@ -75,6 +75,14 @@ public class JobSubmissionFiles {
public static Path getJobDistCacheFiles(Path jobSubmitDir) {
return new Path(jobSubmitDir, "files");
}
+
+ /**
+ * Get the job distributed cache path for log4j properties.
+ * @param jobSubmitDir
+ */
+ public static Path getJobLog4jFile(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "log4j");
+ }
/**
* Get the job distributed cache archives path.
* @param jobSubmitDir
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 59202ca..b76a734 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
@@ -262,15 +263,102 @@ class JobSubmitter {
LOG.warn("No job jar file set. User classes may not be found. "+
"See Job or Job#setJar(String).");
}
-
+
+ addLog4jToDistributedCache(job, submitJobDir);
+
// set the timestamps of the archives and files
// set the public/private visibility of the archives and files
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
- // get DelegationToken for each cached file
+ // get DelegationToken for cached file
ClientDistributedCacheManager.getDelegationTokens(conf, job
.getCredentials());
}
+ // copy user specified log4j.property file in local
+ // to HDFS with putting on distributed cache and adding its parent directory
+ // to classpath.
+ @SuppressWarnings("deprecation")
+ private void copyLog4jPropertyFile(Job job, Path submitJobDir,
+ short replication) throws IOException {
+ Configuration conf = job.getConfiguration();
+
+ String file = validateFilePath(
+ conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf);
+ LOG.debug("default FileSystem: " + jtFs.getUri());
+ FsPermission mapredSysPerms =
+ new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+ if (!jtFs.exists(submitJobDir)) {
+ throw new IOException("Cannot find job submission directory! "
+ + "It should just be created, so something wrong here.");
+ }
+
+ Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir);
+
+ // first copy local log4j.properties file to HDFS under submitJobDir
+ if (file != null) {
+ FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms);
+ URI tmpURI = null;
+ try {
+ tmpURI = new URI(file);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ Path tmp = new Path(tmpURI);
+ Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
+ DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), conf);
+ }
+ }
+
+ /**
+ * takes input as a path string for file and verifies if it exist.
+ * It defaults for file:/// if the files specified do not have a scheme.
+ * it returns the paths uri converted defaulting to file:///.
+ * So an input of /home/user/file1 would return file:///home/user/file1
+ * @param file
+ * @param conf
+ * @return
+ */
+ private String validateFilePath(String file, Configuration conf)
+ throws IOException {
+ if (file == null) {
+ return null;
+ }
+ if (file.isEmpty()) {
+ throw new IllegalArgumentException("File name can't be empty string");
+ }
+ String finalPath;
+ URI pathURI;
+ try {
+ pathURI = new URI(file);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ Path path = new Path(pathURI);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ if (pathURI.getScheme() == null) {
+ //default to the local file system
+ //check if the file exists or not first
+ if (!localFs.exists(path)) {
+ throw new FileNotFoundException("File " + file + " does not exist.");
+ }
+ finalPath = path.makeQualified(localFs.getUri(),
+ localFs.getWorkingDirectory()).toString();
+ }
+ else {
+ // check if the file exists in this file system
+ // we need to recreate this filesystem object to copy
+ // these files to the file system ResourceManager is running
+ // on.
+ FileSystem fs = path.getFileSystem(conf);
+ if (!fs.exists(path)) {
+ throw new FileNotFoundException("File " + file + " does not exist.");
+ }
+ finalPath = path.makeQualified(fs.getUri(),
+ fs.getWorkingDirectory()).toString();
+ }
+ return finalPath;
+ }
+
private URI getPathURI(Path destPath, String fragment)
throws URISyntaxException {
URI pathURI = destPath.toUri();
@@ -305,7 +393,7 @@ class JobSubmitter {
// Set the working directory
if (job.getWorkingDirectory() == null) {
- job.setWorkingDirectory(jtFs.getWorkingDirectory());
+ job.setWorkingDirectory(jtFs.getWorkingDirectory());
}
}
@@ -395,6 +483,10 @@ class JobSubmitter {
}
copyAndConfigureFiles(job, submitJobDir);
+
+
+
+
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
@@ -673,4 +765,20 @@ class JobSubmitter {
DistributedCache.addCacheArchive(uri, conf);
}
}
+
+ private void addLog4jToDistributedCache(Job job,
+ Path jobSubmitDir) throws IOException {
+ Configuration conf = job.getConfiguration();
+ String log4jPropertyFile =
+ conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
+ if (!log4jPropertyFile.isEmpty()) {
+ short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
+ copyLog4jPropertyFile(job, jobSubmitDir, replication);
+
+ // Set the working directory
+ if (job.getWorkingDirectory() == null) {
+ job.setWorkingDirectory(jtFs.getWorkingDirectory());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 5476e92..691074a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -730,6 +730,9 @@ public interface MRJobConfig {
*/
public static final String MAPREDUCE_APPLICATION_CLASSPATH =
"mapreduce.application.classpath";
+
+ public static final String MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE =
+ "mapreduce.job.log4j-properties-file";
/**
* Path to MapReduce framework archive
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed63b116/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 40ef982..a1c4c32 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -398,7 +398,7 @@ public class YARNRunner implements ClientProtocol {
MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
int numBackups = jobConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS,
MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS);
- MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs);
+ MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);
// Check for Java Lib Path usage in MAP and REDUCE configs
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map",