You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/03/04 23:46:09 UTC
hadoop git commit: MAPREDUCE-6267. Refactor
JobSubmitter#copyAndConfigureFiles into it's own class. (Chris Trezzo via
kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk ed70fa142 -> c66c3ac6b
MAPREDUCE-6267. Refactor JobSubmitter#copyAndConfigureFiles into it's own class. (Chris Trezzo via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c66c3ac6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c66c3ac6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c66c3ac6
Branch: refs/heads/trunk
Commit: c66c3ac6bf9f63177279feec3f2917e4b882e2bc
Parents: ed70fa1
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Mar 4 14:42:07 2015 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Mar 4 14:42:07 2015 -0800
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../java/org/apache/hadoop/mapreduce/Job.java | 1 +
.../hadoop/mapreduce/JobResourceUploader.java | 363 +++++++++++++++++++
.../apache/hadoop/mapreduce/JobSubmitter.java | 312 +---------------
4 files changed, 370 insertions(+), 309 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c66c3ac6/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index b2ae9d9..212727e 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -323,6 +323,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public
API in DistCp. (Jing Zhao via vinodkv)
+ MAPREDUCE-6267. Refactor JobSubmitter#copyAndConfigureFiles into it's own
+ class. (Chris Trezzo via kasha)
+
OPTIMIZATIONS
MAPREDUCE-6169. MergeQueue should release reference to the current item
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c66c3ac6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index f404175..9eea4cc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -98,6 +98,7 @@ public class Job extends JobContextImpl implements JobContext {
"mapreduce.client.genericoptionsparser.used";
public static final String SUBMIT_REPLICATION =
"mapreduce.client.submit.file.replication";
+ public static final int DEFAULT_SUBMIT_REPLICATION = 10;
@InterfaceStability.Evolving
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c66c3ac6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
new file mode 100644
index 0000000..eebdf88
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class JobResourceUploader {
+ protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class);
+ private FileSystem jtFs;
+
+ JobResourceUploader(FileSystem submitFs) {
+ this.jtFs = submitFs;
+ }
+
+ /**
+ * Upload and configure files, libjars, jobjars, and archives pertaining to
+ * the passed job.
+ *
+ * @param job the job containing the files to be uploaded
+ * @param submitJobDir the submission directory of the job
+ * @throws IOException
+ */
+ public void uploadFiles(Job job, Path submitJobDir) throws IOException {
+ Configuration conf = job.getConfiguration();
+ short replication =
+ (short) conf.getInt(Job.SUBMIT_REPLICATION,
+ Job.DEFAULT_SUBMIT_REPLICATION);
+
+ if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
+ LOG.warn("Hadoop command-line option parsing not performed. "
+ + "Implement the Tool interface and execute your application "
+ + "with ToolRunner to remedy this.");
+ }
+
+ // get all the command line arguments passed in by the user conf
+ String files = conf.get("tmpfiles");
+ String libjars = conf.get("tmpjars");
+ String archives = conf.get("tmparchives");
+ String jobJar = job.getJar();
+
+ //
+ // Figure out what fs the JobTracker is using. Copy the
+ // job to it, under a temporary name. This allows DFS to work,
+ // and under the local fs also provides UNIX-like object loading
+ // semantics. (that is, if the job file is deleted right after
+ // submission, we can still run the submission to completion)
+ //
+
+ // Create a number of filenames in the JobTracker's fs namespace
+ LOG.debug("default FileSystem: " + jtFs.getUri());
+ if (jtFs.exists(submitJobDir)) {
+ throw new IOException("Not submitting job. Job directory " + submitJobDir
+ + " already exists!! This is unexpected.Please check what's there in"
+ + " that directory");
+ }
+ submitJobDir = jtFs.makeQualified(submitJobDir);
+ submitJobDir = new Path(submitJobDir.toUri().getPath());
+ FsPermission mapredSysPerms =
+ new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+ FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
+ Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
+ Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
+ Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
+ // add all the command line files/ jars and archive
+ // first copy them to jobtrackers filesystem
+
+ if (files != null) {
+ FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
+ String[] fileArr = files.split(",");
+ for (String tmpFile : fileArr) {
+ URI tmpURI = null;
+ try {
+ tmpURI = new URI(tmpFile);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ Path tmp = new Path(tmpURI);
+ Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
+ try {
+ URI pathURI = getPathURI(newPath, tmpURI.getFragment());
+ DistributedCache.addCacheFile(pathURI, conf);
+ } catch (URISyntaxException ue) {
+ // should not throw a uri exception
+ throw new IOException("Failed to create uri for " + tmpFile, ue);
+ }
+ }
+ }
+
+ if (libjars != null) {
+ FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
+ String[] libjarsArr = libjars.split(",");
+ for (String tmpjars : libjarsArr) {
+ Path tmp = new Path(tmpjars);
+ Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
+ DistributedCache.addFileToClassPath(
+ new Path(newPath.toUri().getPath()), conf);
+ }
+ }
+
+ if (archives != null) {
+ FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
+ String[] archivesArr = archives.split(",");
+ for (String tmpArchives : archivesArr) {
+ URI tmpURI;
+ try {
+ tmpURI = new URI(tmpArchives);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ Path tmp = new Path(tmpURI);
+ Path newPath = copyRemoteFiles(archivesDir, tmp, conf, replication);
+ try {
+ URI pathURI = getPathURI(newPath, tmpURI.getFragment());
+ DistributedCache.addCacheArchive(pathURI, conf);
+ } catch (URISyntaxException ue) {
+ // should not throw an uri excpetion
+ throw new IOException("Failed to create uri for " + tmpArchives, ue);
+ }
+ }
+ }
+
+ if (jobJar != null) { // copy jar to JobTracker's fs
+ // use jar name if job is not named.
+ if ("".equals(job.getJobName())) {
+ job.setJobName(new Path(jobJar).getName());
+ }
+ Path jobJarPath = new Path(jobJar);
+ URI jobJarURI = jobJarPath.toUri();
+ // If the job jar is already in a global fs,
+ // we don't need to copy it from local fs
+ if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) {
+ copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
+ replication);
+ job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+ }
+ } else {
+ LOG.warn("No job jar file set. User classes may not be found. "
+ + "See Job or Job#setJar(String).");
+ }
+
+ addLog4jToDistributedCache(job, submitJobDir);
+
+ // set the timestamps of the archives and files
+ // set the public/private visibility of the archives and files
+ ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
+ // get DelegationToken for cached file
+ ClientDistributedCacheManager.getDelegationTokens(conf,
+ job.getCredentials());
+ }
+
+ // copies a file to the jobtracker filesystem and returns the path where it
+ // was copied to
+ private Path copyRemoteFiles(Path parentDir, Path originalPath,
+ Configuration conf, short replication) throws IOException {
+ // check if we do not need to copy the files
+ // is jt using the same file system.
+ // just checking for uri strings... doing no dns lookups
+ // to see if the filesystems are the same. This is not optimal.
+ // but avoids name resolution.
+
+ FileSystem remoteFs = null;
+ remoteFs = originalPath.getFileSystem(conf);
+ if (compareFs(remoteFs, jtFs)) {
+ return originalPath;
+ }
+ // this might have name collisions. copy will throw an exception
+ // parse the original path to create new path
+ Path newPath = new Path(parentDir, originalPath.getName());
+ FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
+ jtFs.setReplication(newPath, replication);
+ return newPath;
+ }
+
+ /*
+ * see if two file systems are the same or not.
+ */
+ private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
+ URI srcUri = srcFs.getUri();
+ URI dstUri = destFs.getUri();
+ if (srcUri.getScheme() == null) {
+ return false;
+ }
+ if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+ return false;
+ }
+ String srcHost = srcUri.getHost();
+ String dstHost = dstUri.getHost();
+ if ((srcHost != null) && (dstHost != null)) {
+ try {
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+ } catch (UnknownHostException ue) {
+ return false;
+ }
+ if (!srcHost.equals(dstHost)) {
+ return false;
+ }
+ } else if (srcHost == null && dstHost != null) {
+ return false;
+ } else if (srcHost != null && dstHost == null) {
+ return false;
+ }
+ // check for ports
+ if (srcUri.getPort() != dstUri.getPort()) {
+ return false;
+ }
+ return true;
+ }
+
+ private void copyJar(Path originalJarPath, Path submitJarFile,
+ short replication) throws IOException {
+ jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
+ jtFs.setReplication(submitJarFile, replication);
+ jtFs.setPermission(submitJarFile, new FsPermission(
+ JobSubmissionFiles.JOB_FILE_PERMISSION));
+ }
+
+ private void addLog4jToDistributedCache(Job job, Path jobSubmitDir)
+ throws IOException {
+ Configuration conf = job.getConfiguration();
+ String log4jPropertyFile =
+ conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
+ if (!log4jPropertyFile.isEmpty()) {
+ short replication = (short) conf.getInt(Job.SUBMIT_REPLICATION, 10);
+ copyLog4jPropertyFile(job, jobSubmitDir, replication);
+ }
+ }
+
+ private URI getPathURI(Path destPath, String fragment)
+ throws URISyntaxException {
+ URI pathURI = destPath.toUri();
+ if (pathURI.getFragment() == null) {
+ if (fragment == null) {
+ pathURI = new URI(pathURI.toString() + "#" + destPath.getName());
+ } else {
+ pathURI = new URI(pathURI.toString() + "#" + fragment);
+ }
+ }
+ return pathURI;
+ }
+
+ // copy user specified log4j.property file in local
+ // to HDFS with putting on distributed cache and adding its parent directory
+ // to classpath.
+ @SuppressWarnings("deprecation")
+ private void copyLog4jPropertyFile(Job job, Path submitJobDir,
+ short replication) throws IOException {
+ Configuration conf = job.getConfiguration();
+
+ String file =
+ validateFilePath(
+ conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf);
+ LOG.debug("default FileSystem: " + jtFs.getUri());
+ FsPermission mapredSysPerms =
+ new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+ if (!jtFs.exists(submitJobDir)) {
+ throw new IOException("Cannot find job submission directory! "
+ + "It should just be created, so something wrong here.");
+ }
+
+ Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir);
+
+ // first copy local log4j.properties file to HDFS under submitJobDir
+ if (file != null) {
+ FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms);
+ URI tmpURI = null;
+ try {
+ tmpURI = new URI(file);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ Path tmp = new Path(tmpURI);
+ Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
+ DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()),
+ conf);
+ }
+ }
+
+ /**
+ * takes input as a path string for file and verifies if it exist. It defaults
+ * for file:/// if the files specified do not have a scheme. it returns the
+ * paths uri converted defaulting to file:///. So an input of /home/user/file1
+ * would return file:///home/user/file1
+ *
+ * @param file
+ * @param conf
+ * @return
+ */
+ private String validateFilePath(String file, Configuration conf)
+ throws IOException {
+ if (file == null) {
+ return null;
+ }
+ if (file.isEmpty()) {
+ throw new IllegalArgumentException("File name can't be empty string");
+ }
+ String finalPath;
+ URI pathURI;
+ try {
+ pathURI = new URI(file);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ Path path = new Path(pathURI);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ if (pathURI.getScheme() == null) {
+ // default to the local file system
+ // check if the file exists or not first
+ if (!localFs.exists(path)) {
+ throw new FileNotFoundException("File " + file + " does not exist.");
+ }
+ finalPath =
+ path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory())
+ .toString();
+ } else {
+ // check if the file exists in this file system
+ // we need to recreate this filesystem object to copy
+ // these files to the file system ResourceManager is running
+ // on.
+ FileSystem fs = path.getFileSystem(conf);
+ if (!fs.exists(path)) {
+ throw new FileNotFoundException("File " + file + " does not exist.");
+ }
+ finalPath =
+ path.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
+ }
+ return finalPath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c66c3ac6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 75357f7..30a87c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -86,297 +86,6 @@ class JobSubmitter {
this.submitClient = submitClient;
this.jtFs = submitFs;
}
- /*
- * see if two file systems are the same or not.
- */
- private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
- URI srcUri = srcFs.getUri();
- URI dstUri = destFs.getUri();
- if (srcUri.getScheme() == null) {
- return false;
- }
- if (!srcUri.getScheme().equals(dstUri.getScheme())) {
- return false;
- }
- String srcHost = srcUri.getHost();
- String dstHost = dstUri.getHost();
- if ((srcHost != null) && (dstHost != null)) {
- try {
- srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
- dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
- } catch(UnknownHostException ue) {
- return false;
- }
- if (!srcHost.equals(dstHost)) {
- return false;
- }
- } else if (srcHost == null && dstHost != null) {
- return false;
- } else if (srcHost != null && dstHost == null) {
- return false;
- }
- //check for ports
- if (srcUri.getPort() != dstUri.getPort()) {
- return false;
- }
- return true;
- }
-
- // copies a file to the jobtracker filesystem and returns the path where it
- // was copied to
- private Path copyRemoteFiles(Path parentDir,
- Path originalPath, Configuration conf, short replication)
- throws IOException {
- //check if we do not need to copy the files
- // is jt using the same file system.
- // just checking for uri strings... doing no dns lookups
- // to see if the filesystems are the same. This is not optimal.
- // but avoids name resolution.
-
- FileSystem remoteFs = null;
- remoteFs = originalPath.getFileSystem(conf);
- if (compareFs(remoteFs, jtFs)) {
- return originalPath;
- }
- // this might have name collisions. copy will throw an exception
- //parse the original path to create new path
- Path newPath = new Path(parentDir, originalPath.getName());
- FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
- jtFs.setReplication(newPath, replication);
- return newPath;
- }
-
- // configures -files, -libjars and -archives.
- private void copyAndConfigureFiles(Job job, Path submitJobDir,
- short replication) throws IOException {
- Configuration conf = job.getConfiguration();
- if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
- LOG.warn("Hadoop command-line option parsing not performed. " +
- "Implement the Tool interface and execute your application " +
- "with ToolRunner to remedy this.");
- }
-
- // get all the command line arguments passed in by the user conf
- String files = conf.get("tmpfiles");
- String libjars = conf.get("tmpjars");
- String archives = conf.get("tmparchives");
- String jobJar = job.getJar();
-
- //
- // Figure out what fs the JobTracker is using. Copy the
- // job to it, under a temporary name. This allows DFS to work,
- // and under the local fs also provides UNIX-like object loading
- // semantics. (that is, if the job file is deleted right after
- // submission, we can still run the submission to completion)
- //
-
- // Create a number of filenames in the JobTracker's fs namespace
- LOG.debug("default FileSystem: " + jtFs.getUri());
- if (jtFs.exists(submitJobDir)) {
- throw new IOException("Not submitting job. Job directory " + submitJobDir
- +" already exists!! This is unexpected.Please check what's there in" +
- " that directory");
- }
- submitJobDir = jtFs.makeQualified(submitJobDir);
- submitJobDir = new Path(submitJobDir.toUri().getPath());
- FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
- FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
- Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
- Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
- Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
- // add all the command line files/ jars and archive
- // first copy them to jobtrackers filesystem
-
- if (files != null) {
- FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
- String[] fileArr = files.split(",");
- for (String tmpFile: fileArr) {
- URI tmpURI = null;
- try {
- tmpURI = new URI(tmpFile);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- Path tmp = new Path(tmpURI);
- Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
- try {
- URI pathURI = getPathURI(newPath, tmpURI.getFragment());
- DistributedCache.addCacheFile(pathURI, conf);
- } catch(URISyntaxException ue) {
- //should not throw a uri exception
- throw new IOException("Failed to create uri for " + tmpFile, ue);
- }
- }
- }
-
- if (libjars != null) {
- FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
- String[] libjarsArr = libjars.split(",");
- for (String tmpjars: libjarsArr) {
- Path tmp = new Path(tmpjars);
- Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
- DistributedCache.addFileToClassPath(
- new Path(newPath.toUri().getPath()), conf);
- }
- }
-
- if (archives != null) {
- FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
- String[] archivesArr = archives.split(",");
- for (String tmpArchives: archivesArr) {
- URI tmpURI;
- try {
- tmpURI = new URI(tmpArchives);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- Path tmp = new Path(tmpURI);
- Path newPath = copyRemoteFiles(archivesDir, tmp, conf,
- replication);
- try {
- URI pathURI = getPathURI(newPath, tmpURI.getFragment());
- DistributedCache.addCacheArchive(pathURI, conf);
- } catch(URISyntaxException ue) {
- //should not throw an uri excpetion
- throw new IOException("Failed to create uri for " + tmpArchives, ue);
- }
- }
- }
-
- if (jobJar != null) { // copy jar to JobTracker's fs
- // use jar name if job is not named.
- if ("".equals(job.getJobName())){
- job.setJobName(new Path(jobJar).getName());
- }
- Path jobJarPath = new Path(jobJar);
- URI jobJarURI = jobJarPath.toUri();
- // If the job jar is already in a global fs,
- // we don't need to copy it from local fs
- if ( jobJarURI.getScheme() == null
- || jobJarURI.getScheme().equals("file")) {
- copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
- replication);
- job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
- }
- } else {
- LOG.warn("No job jar file set. User classes may not be found. "+
- "See Job or Job#setJar(String).");
- }
-
- addLog4jToDistributedCache(job, submitJobDir);
-
- // set the timestamps of the archives and files
- // set the public/private visibility of the archives and files
- ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
- // get DelegationToken for cached file
- ClientDistributedCacheManager.getDelegationTokens(conf, job
- .getCredentials());
- }
-
- // copy user specified log4j.property file in local
- // to HDFS with putting on distributed cache and adding its parent directory
- // to classpath.
- @SuppressWarnings("deprecation")
- private void copyLog4jPropertyFile(Job job, Path submitJobDir,
- short replication) throws IOException {
- Configuration conf = job.getConfiguration();
-
- String file = validateFilePath(
- conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf);
- LOG.debug("default FileSystem: " + jtFs.getUri());
- FsPermission mapredSysPerms =
- new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
- if (!jtFs.exists(submitJobDir)) {
- throw new IOException("Cannot find job submission directory! "
- + "It should just be created, so something wrong here.");
- }
-
- Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir);
-
- // first copy local log4j.properties file to HDFS under submitJobDir
- if (file != null) {
- FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms);
- URI tmpURI = null;
- try {
- tmpURI = new URI(file);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- Path tmp = new Path(tmpURI);
- Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
- DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), conf);
- }
- }
-
- /**
- * takes input as a path string for file and verifies if it exist.
- * It defaults for file:/// if the files specified do not have a scheme.
- * it returns the paths uri converted defaulting to file:///.
- * So an input of /home/user/file1 would return file:///home/user/file1
- * @param file
- * @param conf
- * @return
- */
- private String validateFilePath(String file, Configuration conf)
- throws IOException {
- if (file == null) {
- return null;
- }
- if (file.isEmpty()) {
- throw new IllegalArgumentException("File name can't be empty string");
- }
- String finalPath;
- URI pathURI;
- try {
- pathURI = new URI(file);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- Path path = new Path(pathURI);
- FileSystem localFs = FileSystem.getLocal(conf);
- if (pathURI.getScheme() == null) {
- //default to the local file system
- //check if the file exists or not first
- if (!localFs.exists(path)) {
- throw new FileNotFoundException("File " + file + " does not exist.");
- }
- finalPath = path.makeQualified(localFs.getUri(),
- localFs.getWorkingDirectory()).toString();
- }
- else {
- // check if the file exists in this file system
- // we need to recreate this filesystem object to copy
- // these files to the file system ResourceManager is running
- // on.
- FileSystem fs = path.getFileSystem(conf);
- if (!fs.exists(path)) {
- throw new FileNotFoundException("File " + file + " does not exist.");
- }
- finalPath = path.makeQualified(fs.getUri(),
- fs.getWorkingDirectory()).toString();
- }
- return finalPath;
- }
-
- private URI getPathURI(Path destPath, String fragment)
- throws URISyntaxException {
- URI pathURI = destPath.toUri();
- if (pathURI.getFragment() == null) {
- if (fragment == null) {
- pathURI = new URI(pathURI.toString() + "#" + destPath.getName());
- } else {
- pathURI = new URI(pathURI.toString() + "#" + fragment);
- }
- }
- return pathURI;
- }
-
- private void copyJar(Path originalJarPath, Path submitJarFile,
- short replication) throws IOException {
- jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
- jtFs.setReplication(submitJarFile, replication);
- jtFs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
- }
/**
* configure the jobconf of the user with the command line options of
@@ -386,9 +95,8 @@ class JobSubmitter {
*/
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
- Configuration conf = job.getConfiguration();
- short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
- copyAndConfigureFiles(job, jobSubmitDir, replication);
+ JobResourceUploader rUploader = new JobResourceUploader(jtFs);
+ rUploader.uploadFiles(job, jobSubmitDir);
// Get the working directory. If not set, sets it to filesystem working dir
// This code has been added so that working directory reset before running
@@ -396,8 +104,8 @@ class JobSubmitter {
// might use the public API JobConf#setWorkingDirectory to reset the working
// directory.
job.getWorkingDirectory();
-
}
+
/**
* Internal method for submitting jobs to the system.
*
@@ -484,10 +192,7 @@ class JobSubmitter {
}
copyAndConfigureFiles(job, submitJobDir);
-
-
-
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
@@ -766,15 +471,4 @@ class JobSubmitter {
DistributedCache.addCacheArchive(uri, conf);
}
}
-
- private void addLog4jToDistributedCache(Job job,
- Path jobSubmitDir) throws IOException {
- Configuration conf = job.getConfiguration();
- String log4jPropertyFile =
- conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
- if (!log4jPropertyFile.isEmpty()) {
- short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
- copyLog4jPropertyFile(job, jobSubmitDir, replication);
- }
- }
}