You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/03/04 23:46:09 UTC

hadoop git commit: MAPREDUCE-6267. Refactor JobSubmitter#copyAndConfigureFiles into it's own class. (Chris Trezzo via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk ed70fa142 -> c66c3ac6b


MAPREDUCE-6267. Refactor JobSubmitter#copyAndConfigureFiles into it's own class. (Chris Trezzo via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c66c3ac6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c66c3ac6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c66c3ac6

Branch: refs/heads/trunk
Commit: c66c3ac6bf9f63177279feec3f2917e4b882e2bc
Parents: ed70fa1
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Mar 4 14:42:07 2015 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Mar 4 14:42:07 2015 -0800

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../java/org/apache/hadoop/mapreduce/Job.java   |   1 +
 .../hadoop/mapreduce/JobResourceUploader.java   | 363 +++++++++++++++++++
 .../apache/hadoop/mapreduce/JobSubmitter.java   | 312 +---------------
 4 files changed, 370 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c66c3ac6/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index b2ae9d9..212727e 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -323,6 +323,9 @@ Release 2.7.0 - UNRELEASED
     MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public
     API in DistCp. (Jing Zhao via vinodkv)
 
+    MAPREDUCE-6267. Refactor JobSubmitter#copyAndConfigureFiles into it's own 
+    class. (Chris Trezzo via kasha)
+
   OPTIMIZATIONS
 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c66c3ac6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index f404175..9eea4cc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -98,6 +98,7 @@ public class Job extends JobContextImpl implements JobContext {
     "mapreduce.client.genericoptionsparser.used";
   public static final String SUBMIT_REPLICATION = 
     "mapreduce.client.submit.file.replication";
+  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
 
   @InterfaceStability.Evolving
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c66c3ac6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
new file mode 100644
index 0000000..eebdf88
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class JobResourceUploader {
+  protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class);
+  private FileSystem jtFs;
+
+  JobResourceUploader(FileSystem submitFs) {
+    this.jtFs = submitFs;
+  }
+
+  /**
+   * Upload and configure files, libjars, jobjars, and archives pertaining to
+   * the passed job.
+   * 
+   * @param job the job containing the files to be uploaded
+   * @param submitJobDir the submission directory of the job
+   * @throws IOException
+   */
+  public void uploadFiles(Job job, Path submitJobDir) throws IOException {
+    Configuration conf = job.getConfiguration();
+    short replication =
+        (short) conf.getInt(Job.SUBMIT_REPLICATION,
+            Job.DEFAULT_SUBMIT_REPLICATION);
+
+    if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
+      LOG.warn("Hadoop command-line option parsing not performed. "
+          + "Implement the Tool interface and execute your application "
+          + "with ToolRunner to remedy this.");
+    }
+
+    // get all the command line arguments passed in by the user conf
+    String files = conf.get("tmpfiles");
+    String libjars = conf.get("tmpjars");
+    String archives = conf.get("tmparchives");
+    String jobJar = job.getJar();
+
+    //
+    // Figure out what fs the JobTracker is using. Copy the
+    // job to it, under a temporary name. This allows DFS to work,
+    // and under the local fs also provides UNIX-like object loading
+    // semantics. (that is, if the job file is deleted right after
+    // submission, we can still run the submission to completion)
+    //
+
+    // Create a number of filenames in the JobTracker's fs namespace
+    LOG.debug("default FileSystem: " + jtFs.getUri());
+    if (jtFs.exists(submitJobDir)) {
+      throw new IOException("Not submitting job. Job directory " + submitJobDir
+          + " already exists!! This is unexpected.Please check what's there in"
+          + " that directory");
+    }
+    submitJobDir = jtFs.makeQualified(submitJobDir);
+    submitJobDir = new Path(submitJobDir.toUri().getPath());
+    FsPermission mapredSysPerms =
+        new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
+    Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
+    Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
+    Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
+    // add all the command line files/ jars and archive
+    // first copy them to jobtrackers filesystem
+
+    if (files != null) {
+      FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
+      String[] fileArr = files.split(",");
+      for (String tmpFile : fileArr) {
+        URI tmpURI = null;
+        try {
+          tmpURI = new URI(tmpFile);
+        } catch (URISyntaxException e) {
+          throw new IllegalArgumentException(e);
+        }
+        Path tmp = new Path(tmpURI);
+        Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
+        try {
+          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
+          DistributedCache.addCacheFile(pathURI, conf);
+        } catch (URISyntaxException ue) {
+          // should not throw a uri exception
+          throw new IOException("Failed to create uri for " + tmpFile, ue);
+        }
+      }
+    }
+
+    if (libjars != null) {
+      FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
+      String[] libjarsArr = libjars.split(",");
+      for (String tmpjars : libjarsArr) {
+        Path tmp = new Path(tmpjars);
+        Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
+        DistributedCache.addFileToClassPath(
+            new Path(newPath.toUri().getPath()), conf);
+      }
+    }
+
+    if (archives != null) {
+      FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
+      String[] archivesArr = archives.split(",");
+      for (String tmpArchives : archivesArr) {
+        URI tmpURI;
+        try {
+          tmpURI = new URI(tmpArchives);
+        } catch (URISyntaxException e) {
+          throw new IllegalArgumentException(e);
+        }
+        Path tmp = new Path(tmpURI);
+        Path newPath = copyRemoteFiles(archivesDir, tmp, conf, replication);
+        try {
+          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
+          DistributedCache.addCacheArchive(pathURI, conf);
+        } catch (URISyntaxException ue) {
+          // should not throw an uri excpetion
+          throw new IOException("Failed to create uri for " + tmpArchives, ue);
+        }
+      }
+    }
+
+    if (jobJar != null) { // copy jar to JobTracker's fs
+      // use jar name if job is not named.
+      if ("".equals(job.getJobName())) {
+        job.setJobName(new Path(jobJar).getName());
+      }
+      Path jobJarPath = new Path(jobJar);
+      URI jobJarURI = jobJarPath.toUri();
+      // If the job jar is already in a global fs,
+      // we don't need to copy it from local fs
+      if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) {
+        copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
+            replication);
+        job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+      }
+    } else {
+      LOG.warn("No job jar file set.  User classes may not be found. "
+          + "See Job or Job#setJar(String).");
+    }
+
+    addLog4jToDistributedCache(job, submitJobDir);
+
+    // set the timestamps of the archives and files
+    // set the public/private visibility of the archives and files
+    ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
+    // get DelegationToken for cached file
+    ClientDistributedCacheManager.getDelegationTokens(conf,
+        job.getCredentials());
+  }
+
+  // copies a file to the jobtracker filesystem and returns the path where it
+  // was copied to
+  private Path copyRemoteFiles(Path parentDir, Path originalPath,
+      Configuration conf, short replication) throws IOException {
+    // check if we do not need to copy the files
+    // is jt using the same file system.
+    // just checking for uri strings... doing no dns lookups
+    // to see if the filesystems are the same. This is not optimal.
+    // but avoids name resolution.
+
+    FileSystem remoteFs = null;
+    remoteFs = originalPath.getFileSystem(conf);
+    if (compareFs(remoteFs, jtFs)) {
+      return originalPath;
+    }
+    // this might have name collisions. copy will throw an exception
+    // parse the original path to create new path
+    Path newPath = new Path(parentDir, originalPath.getName());
+    FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
+    jtFs.setReplication(newPath, replication);
+    return newPath;
+  }
+
+  /*
+   * see if two file systems are the same or not.
+   */
+  private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
+    URI srcUri = srcFs.getUri();
+    URI dstUri = destFs.getUri();
+    if (srcUri.getScheme() == null) {
+      return false;
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false;
+    }
+    String srcHost = srcUri.getHost();
+    String dstHost = dstUri.getHost();
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+      } catch (UnknownHostException ue) {
+        return false;
+      }
+      if (!srcHost.equals(dstHost)) {
+        return false;
+      }
+    } else if (srcHost == null && dstHost != null) {
+      return false;
+    } else if (srcHost != null && dstHost == null) {
+      return false;
+    }
+    // check for ports
+    if (srcUri.getPort() != dstUri.getPort()) {
+      return false;
+    }
+    return true;
+  }
+
+  private void copyJar(Path originalJarPath, Path submitJarFile,
+      short replication) throws IOException {
+    jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
+    jtFs.setReplication(submitJarFile, replication);
+    jtFs.setPermission(submitJarFile, new FsPermission(
+        JobSubmissionFiles.JOB_FILE_PERMISSION));
+  }
+
+  private void addLog4jToDistributedCache(Job job, Path jobSubmitDir)
+      throws IOException {
+    Configuration conf = job.getConfiguration();
+    String log4jPropertyFile =
+        conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
+    if (!log4jPropertyFile.isEmpty()) {
+      short replication = (short) conf.getInt(Job.SUBMIT_REPLICATION, 10);
+      copyLog4jPropertyFile(job, jobSubmitDir, replication);
+    }
+  }
+
+  private URI getPathURI(Path destPath, String fragment)
+      throws URISyntaxException {
+    URI pathURI = destPath.toUri();
+    if (pathURI.getFragment() == null) {
+      if (fragment == null) {
+        pathURI = new URI(pathURI.toString() + "#" + destPath.getName());
+      } else {
+        pathURI = new URI(pathURI.toString() + "#" + fragment);
+      }
+    }
+    return pathURI;
+  }
+
+  // copy user specified log4j.property file in local
+  // to HDFS with putting on distributed cache and adding its parent directory
+  // to classpath.
+  @SuppressWarnings("deprecation")
+  private void copyLog4jPropertyFile(Job job, Path submitJobDir,
+      short replication) throws IOException {
+    Configuration conf = job.getConfiguration();
+
+    String file =
+        validateFilePath(
+            conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf);
+    LOG.debug("default FileSystem: " + jtFs.getUri());
+    FsPermission mapredSysPerms =
+        new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    if (!jtFs.exists(submitJobDir)) {
+      throw new IOException("Cannot find job submission directory! "
+          + "It should just be created, so something wrong here.");
+    }
+
+    Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir);
+
+    // first copy local log4j.properties file to HDFS under submitJobDir
+    if (file != null) {
+      FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms);
+      URI tmpURI = null;
+      try {
+        tmpURI = new URI(file);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e);
+      }
+      Path tmp = new Path(tmpURI);
+      Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
+      DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()),
+          conf);
+    }
+  }
+
+  /**
+   * takes input as a path string for file and verifies if it exist. It defaults
+   * for file:/// if the files specified do not have a scheme. it returns the
+   * paths uri converted defaulting to file:///. So an input of /home/user/file1
+   * would return file:///home/user/file1
+   * 
+   * @param file
+   * @param conf
+   * @return
+   */
+  private String validateFilePath(String file, Configuration conf)
+      throws IOException {
+    if (file == null) {
+      return null;
+    }
+    if (file.isEmpty()) {
+      throw new IllegalArgumentException("File name can't be empty string");
+    }
+    String finalPath;
+    URI pathURI;
+    try {
+      pathURI = new URI(file);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(e);
+    }
+    Path path = new Path(pathURI);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    if (pathURI.getScheme() == null) {
+      // default to the local file system
+      // check if the file exists or not first
+      if (!localFs.exists(path)) {
+        throw new FileNotFoundException("File " + file + " does not exist.");
+      }
+      finalPath =
+          path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory())
+              .toString();
+    } else {
+      // check if the file exists in this file system
+      // we need to recreate this filesystem object to copy
+      // these files to the file system ResourceManager is running
+      // on.
+      FileSystem fs = path.getFileSystem(conf);
+      if (!fs.exists(path)) {
+        throw new FileNotFoundException("File " + file + " does not exist.");
+      }
+      finalPath =
+          path.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
+    }
+    return finalPath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c66c3ac6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 75357f7..30a87c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -86,297 +86,6 @@ class JobSubmitter {
     this.submitClient = submitClient;
     this.jtFs = submitFs;
   }
-  /*
-   * see if two file systems are the same or not.
-   */
-  private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
-    URI srcUri = srcFs.getUri();
-    URI dstUri = destFs.getUri();
-    if (srcUri.getScheme() == null) {
-      return false;
-    }
-    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
-      return false;
-    }
-    String srcHost = srcUri.getHost();    
-    String dstHost = dstUri.getHost();
-    if ((srcHost != null) && (dstHost != null)) {
-      try {
-        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
-        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
-      } catch(UnknownHostException ue) {
-        return false;
-      }
-      if (!srcHost.equals(dstHost)) {
-        return false;
-      }
-    } else if (srcHost == null && dstHost != null) {
-      return false;
-    } else if (srcHost != null && dstHost == null) {
-      return false;
-    }
-    //check for ports
-    if (srcUri.getPort() != dstUri.getPort()) {
-      return false;
-    }
-    return true;
-  }
-
-  // copies a file to the jobtracker filesystem and returns the path where it
-  // was copied to
-  private Path copyRemoteFiles(Path parentDir,
-      Path originalPath, Configuration conf, short replication) 
-      throws IOException {
-    //check if we do not need to copy the files
-    // is jt using the same file system.
-    // just checking for uri strings... doing no dns lookups 
-    // to see if the filesystems are the same. This is not optimal.
-    // but avoids name resolution.
-    
-    FileSystem remoteFs = null;
-    remoteFs = originalPath.getFileSystem(conf);
-    if (compareFs(remoteFs, jtFs)) {
-      return originalPath;
-    }
-    // this might have name collisions. copy will throw an exception
-    //parse the original path to create new path
-    Path newPath = new Path(parentDir, originalPath.getName());
-    FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
-    jtFs.setReplication(newPath, replication);
-    return newPath;
-  }
-
-  // configures -files, -libjars and -archives.
-  private void copyAndConfigureFiles(Job job, Path submitJobDir,
-      short replication) throws IOException {
-    Configuration conf = job.getConfiguration();
-    if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
-      LOG.warn("Hadoop command-line option parsing not performed. " +
-               "Implement the Tool interface and execute your application " +
-               "with ToolRunner to remedy this.");
-    }
-
-    // get all the command line arguments passed in by the user conf
-    String files = conf.get("tmpfiles");
-    String libjars = conf.get("tmpjars");
-    String archives = conf.get("tmparchives");
-    String jobJar = job.getJar();
-
-    //
-    // Figure out what fs the JobTracker is using.  Copy the
-    // job to it, under a temporary name.  This allows DFS to work,
-    // and under the local fs also provides UNIX-like object loading 
-    // semantics.  (that is, if the job file is deleted right after
-    // submission, we can still run the submission to completion)
-    //
-
-    // Create a number of filenames in the JobTracker's fs namespace
-    LOG.debug("default FileSystem: " + jtFs.getUri());
-    if (jtFs.exists(submitJobDir)) {
-      throw new IOException("Not submitting job. Job directory " + submitJobDir
-          +" already exists!! This is unexpected.Please check what's there in" +
-          " that directory");
-    }
-    submitJobDir = jtFs.makeQualified(submitJobDir);
-    submitJobDir = new Path(submitJobDir.toUri().getPath());
-    FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
-    FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
-    Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
-    Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
-    Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
-    // add all the command line files/ jars and archive
-    // first copy them to jobtrackers filesystem 
-      
-    if (files != null) {
-      FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
-      String[] fileArr = files.split(",");
-      for (String tmpFile: fileArr) {
-        URI tmpURI = null;
-        try {
-          tmpURI = new URI(tmpFile);
-        } catch (URISyntaxException e) {
-          throw new IllegalArgumentException(e);
-        }
-        Path tmp = new Path(tmpURI);
-        Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
-        try {
-          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
-          DistributedCache.addCacheFile(pathURI, conf);
-        } catch(URISyntaxException ue) {
-          //should not throw a uri exception 
-          throw new IOException("Failed to create uri for " + tmpFile, ue);
-        }
-      }
-    }
-      
-    if (libjars != null) {
-      FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
-      String[] libjarsArr = libjars.split(",");
-      for (String tmpjars: libjarsArr) {
-        Path tmp = new Path(tmpjars);
-        Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
-        DistributedCache.addFileToClassPath(
-            new Path(newPath.toUri().getPath()), conf);
-      }
-    }
-      
-    if (archives != null) {
-      FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms); 
-      String[] archivesArr = archives.split(",");
-      for (String tmpArchives: archivesArr) {
-        URI tmpURI;
-        try {
-          tmpURI = new URI(tmpArchives);
-        } catch (URISyntaxException e) {
-          throw new IllegalArgumentException(e);
-        }
-        Path tmp = new Path(tmpURI);
-        Path newPath = copyRemoteFiles(archivesDir, tmp, conf,
-          replication);
-        try {
-          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
-          DistributedCache.addCacheArchive(pathURI, conf);
-        } catch(URISyntaxException ue) {
-          //should not throw an uri excpetion
-          throw new IOException("Failed to create uri for " + tmpArchives, ue);
-        }
-      }
-    }
-
-    if (jobJar != null) {   // copy jar to JobTracker's fs
-      // use jar name if job is not named. 
-      if ("".equals(job.getJobName())){
-        job.setJobName(new Path(jobJar).getName());
-      }
-      Path jobJarPath = new Path(jobJar);
-      URI jobJarURI = jobJarPath.toUri();
-      // If the job jar is already in a global fs,
-      // we don't need to copy it from local fs
-      if (     jobJarURI.getScheme() == null
-            || jobJarURI.getScheme().equals("file")) {
-        copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), 
-            replication);
-        job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
-      }
-    } else {
-      LOG.warn("No job jar file set.  User classes may not be found. "+
-      "See Job or Job#setJar(String).");
-    }
-    
-    addLog4jToDistributedCache(job, submitJobDir);
-    
-    //  set the timestamps of the archives and files
-    //  set the public/private visibility of the archives and files
-    ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
-    // get DelegationToken for cached file
-    ClientDistributedCacheManager.getDelegationTokens(conf, job
-        .getCredentials());
-  }
-  
-  // copy user specified log4j.property file in local 
-  // to HDFS with putting on distributed cache and adding its parent directory 
-  // to classpath.
-  @SuppressWarnings("deprecation")
-  private void copyLog4jPropertyFile(Job job, Path submitJobDir,
-      short replication) throws IOException {
-    Configuration conf = job.getConfiguration();
-
-    String file = validateFilePath(
-        conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf);
-    LOG.debug("default FileSystem: " + jtFs.getUri());
-    FsPermission mapredSysPerms = 
-      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
-    if (!jtFs.exists(submitJobDir)) {
-      throw new IOException("Cannot find job submission directory! " 
-          + "It should just be created, so something wrong here.");
-    }
-    
-    Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir);
-
-    // first copy local log4j.properties file to HDFS under submitJobDir
-    if (file != null) {
-      FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms);
-      URI tmpURI = null;
-      try {
-        tmpURI = new URI(file);
-      } catch (URISyntaxException e) {
-        throw new IllegalArgumentException(e);
-      }
-      Path tmp = new Path(tmpURI);
-      Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
-      DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), conf);
-    }
-  }
-  
-  /**
-   * takes input as a path string for file and verifies if it exist. 
-   * It defaults for file:/// if the files specified do not have a scheme.
-   * it returns the paths uri converted defaulting to file:///.
-   * So an input of  /home/user/file1 would return file:///home/user/file1
-   * @param file
-   * @param conf
-   * @return
-   */
-  private String validateFilePath(String file, Configuration conf) 
-      throws IOException  {
-    if (file == null) {
-      return null;
-    }
-    if (file.isEmpty()) {
-      throw new IllegalArgumentException("File name can't be empty string");
-    }
-    String finalPath;
-    URI pathURI;
-    try {
-      pathURI = new URI(file);
-    } catch (URISyntaxException e) {
-      throw new IllegalArgumentException(e);
-    }
-    Path path = new Path(pathURI);
-    FileSystem localFs = FileSystem.getLocal(conf);
-    if (pathURI.getScheme() == null) {
-      //default to the local file system
-      //check if the file exists or not first
-      if (!localFs.exists(path)) {
-        throw new FileNotFoundException("File " + file + " does not exist.");
-      }
-      finalPath = path.makeQualified(localFs.getUri(),
-          localFs.getWorkingDirectory()).toString();
-    }
-    else {
-      // check if the file exists in this file system
-      // we need to recreate this filesystem object to copy
-      // these files to the file system ResourceManager is running
-      // on.
-      FileSystem fs = path.getFileSystem(conf);
-      if (!fs.exists(path)) {
-        throw new FileNotFoundException("File " + file + " does not exist.");
-      }
-      finalPath = path.makeQualified(fs.getUri(),
-          fs.getWorkingDirectory()).toString();
-    }
-    return finalPath;
-  }
-  
-  private URI getPathURI(Path destPath, String fragment) 
-      throws URISyntaxException {
-    URI pathURI = destPath.toUri();
-    if (pathURI.getFragment() == null) {
-      if (fragment == null) {
-        pathURI = new URI(pathURI.toString() + "#" + destPath.getName());
-      } else {
-        pathURI = new URI(pathURI.toString() + "#" + fragment);
-      }
-    }
-    return pathURI;
-  }
-  
-  private void copyJar(Path originalJarPath, Path submitJarFile,
-      short replication) throws IOException {
-    jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
-    jtFs.setReplication(submitJarFile, replication);
-    jtFs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
-  }
   
   /**
    * configure the jobconf of the user with the command line options of 
@@ -386,9 +95,8 @@ class JobSubmitter {
    */
   private void copyAndConfigureFiles(Job job, Path jobSubmitDir) 
   throws IOException {
-    Configuration conf = job.getConfiguration();
-    short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
-    copyAndConfigureFiles(job, jobSubmitDir, replication);
+    JobResourceUploader rUploader = new JobResourceUploader(jtFs);
+    rUploader.uploadFiles(job, jobSubmitDir);
 
     // Get the working directory. If not set, sets it to filesystem working dir
     // This code has been added so that working directory reset before running
@@ -396,8 +104,8 @@ class JobSubmitter {
     // might use the public API JobConf#setWorkingDirectory to reset the working
     // directory.
     job.getWorkingDirectory();
-
   }
+
   /**
    * Internal method for submitting jobs to the system.
    * 
@@ -484,10 +192,7 @@ class JobSubmitter {
       }
 
       copyAndConfigureFiles(job, submitJobDir);
-      
-      
 
-      
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
       
       // Create the splits for the job
@@ -766,15 +471,4 @@ class JobSubmitter {
       DistributedCache.addCacheArchive(uri, conf);
     }
   }
-  
-  private void addLog4jToDistributedCache(Job job,
-      Path jobSubmitDir) throws IOException {
-    Configuration conf = job.getConfiguration();
-    String log4jPropertyFile =
-        conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
-    if (!log4jPropertyFile.isEmpty()) {
-      short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
-      copyLog4jPropertyFile(job, jobSubmitDir, replication);
-    }
-  }
 }