You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by wa...@apache.org on 2014/07/21 23:45:00 UTC

svn commit: r1612403 - in /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project: ./ bin/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/j...

Author: wang
Date: Mon Jul 21 21:44:50 2014
New Revision: 1612403

URL: http://svn.apache.org/r1612403
Log:
Merge from trunk to branch

Modified:
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1610851-1612402

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt Mon Jul 21 21:44:50 2014
@@ -17,6 +17,9 @@ Trunk (Unreleased)
     MAPREDUCE-5232. Add a configuration to be able to log classpath and other
     system properties on mapreduce JVMs startup.  (Sangjin Lee via vinodkv)
 
+    MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving
+    RM-restart. (Rohith via jianhe)
+
   IMPROVEMENTS
 
     MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
@@ -153,6 +156,9 @@ Release 2.6.0 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-5971. Move the default options for distcp -p to
+    DistCpOptionSwitch. (clamb via wang)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -163,6 +169,12 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current
     attempt is the last retry. (Wangda Tan via zjshen)
 
+    MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader
+    enabled if custom output format/committer is used (Sangjin Lee via jlowe)
+
+    MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories
+    in its results (Jason Dere via jlowe)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -307,6 +319,9 @@ Release 2.5.0 - UNRELEASED
     resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via
     vinodkv)
 
+    MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly 
+    assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
+
 Release 2.4.1 - 2014-06-23 
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1610851-1612402

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh Mon Jul 21 21:44:50 2014
@@ -133,6 +133,7 @@ case $startStop in
       else
         echo no $command to stop
       fi
+      rm -f $pid
     else
       echo no $command to stop
     fi

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Mon Jul 21 21:44:50 2014
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
@@ -438,43 +439,6 @@ public class LocalContainerLauncher exte
     }
 
     /**
-     * Within the _local_ filesystem (not HDFS), all activity takes place within
-     * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
-     * and all sub-MapTasks create the same filename ("file.out").  Rename that
-     * to something unique (e.g., "map_0.out") to avoid collisions.
-     *
-     * Longer-term, we'll modify [something] to use TaskAttemptID-based
-     * filenames instead of "file.out". (All of this is entirely internal,
-     * so there are no particular compatibility issues.)
-     */
-    private MapOutputFile renameMapOutputForReduce(JobConf conf,
-        TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      // move map output to reduce input
-      Path mapOut = subMapOutputFile.getOutputFile();
-      FileStatus mStatus = localFs.getFileStatus(mapOut);      
-      Path reduceIn = subMapOutputFile.getInputFileForWrite(
-          TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
-      Path mapOutIndex = new Path(mapOut.toString() + ".index");
-      Path reduceInIndex = new Path(reduceIn.toString() + ".index");
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Renaming map output file for task attempt "
-            + mapId.toString() + " from original location " + mapOut.toString()
-            + " to destination " + reduceIn.toString());
-      }
-      if (!localFs.mkdirs(reduceIn.getParent())) {
-        throw new IOException("Mkdirs failed to create "
-            + reduceIn.getParent().toString());
-      }
-      if (!localFs.rename(mapOut, reduceIn))
-        throw new IOException("Couldn't rename " + mapOut);
-      if (!localFs.rename(mapOutIndex, reduceInIndex))
-        throw new IOException("Couldn't rename " + mapOutIndex);
-      
-      return new RenamedMapOutputFile(reduceIn);
-    }
-
-    /**
      * Also within the local filesystem, we need to restore the initial state
      * of the directory as much as possible.  Compare current contents against
      * the saved original state and nuke everything that doesn't belong, with
@@ -506,7 +470,46 @@ public class LocalContainerLauncher exte
     }
 
   } // end EventHandler
-  
+
+  /**
+   * Within the _local_ filesystem (not HDFS), all activity takes place within
+   * a subdir inside one of the LOCAL_DIRS
+   * (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+   * and all sub-MapTasks create the same filename ("file.out").  Rename that
+   * to something unique (e.g., "map_0.out") to avoid possible collisions.
+   *
+   * Longer-term, we'll modify [something] to use TaskAttemptID-based
+   * filenames instead of "file.out". (All of this is entirely internal,
+   * so there are no particular compatibility issues.)
+   */
+  @VisibleForTesting
+  protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
+      TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    // move map output to reduce input
+    Path mapOut = subMapOutputFile.getOutputFile();
+    FileStatus mStatus = localFs.getFileStatus(mapOut);
+    Path reduceIn = subMapOutputFile.getInputFileForWrite(
+        TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+    Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
+    Path reduceInIndex = new Path(reduceIn.toString() + ".index");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming map output file for task attempt "
+          + mapId.toString() + " from original location " + mapOut.toString()
+          + " to destination " + reduceIn.toString());
+    }
+    if (!localFs.mkdirs(reduceIn.getParent())) {
+      throw new IOException("Mkdirs failed to create "
+          + reduceIn.getParent().toString());
+    }
+    if (!localFs.rename(mapOut, reduceIn))
+      throw new IOException("Couldn't rename " + mapOut);
+    if (!localFs.rename(mapOutIndex, reduceInIndex))
+      throw new IOException("Couldn't rename " + mapOutIndex);
+
+    return new RenamedMapOutputFile(reduceIn);
+  }
+
   private static class RenamedMapOutputFile extends MapOutputFile {
     private Path path;
     

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Jul 21 21:44:50 2014
@@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.LocalContainerLauncher;
@@ -200,6 +199,7 @@ public class MRAppMaster extends Composi
       new JobTokenSecretManager();
   private JobId jobId;
   private boolean newApiCommitter;
+  private ClassLoader jobClassLoader;
   private OutputCommitter committer;
   private JobEventDispatcher jobEventDispatcher;
   private JobHistoryEventHandler jobHistoryEventHandler;
@@ -250,6 +250,9 @@ public class MRAppMaster extends Composi
 
   @Override
   protected void serviceInit(final Configuration conf) throws Exception {
+    // create the job classloader if enabled
+    createJobClassLoader(conf);
+
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     initJobCredentialsAndUGI(conf);
@@ -387,8 +390,13 @@ public class MRAppMaster extends Composi
       addIfService(committerEventHandler);
 
       //policy handling preemption requests from RM
-      preemptionPolicy = createPreemptionPolicy(conf);
-      preemptionPolicy.init(context);
+      callWithJobClassLoader(conf, new Action<Void>() {
+        public Void call(Configuration conf) {
+          preemptionPolicy = createPreemptionPolicy(conf);
+          preemptionPolicy.init(context);
+          return null;
+        }
+      });
 
       //service to handle requests to TaskUmbilicalProtocol
       taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
@@ -453,33 +461,37 @@ public class MRAppMaster extends Composi
   }
 
   private OutputCommitter createOutputCommitter(Configuration conf) {
-    OutputCommitter committer = null;
-
-    LOG.info("OutputCommitter set in config "
-        + conf.get("mapred.output.committer.class"));
-
-    if (newApiCommitter) {
-      org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
-          .newTaskId(jobId, 0, TaskType.MAP);
-      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
-          .newTaskAttemptId(taskID, 0);
-      TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
-          TypeConverter.fromYarn(attemptID));
-      OutputFormat outputFormat;
-      try {
-        outputFormat = ReflectionUtils.newInstance(taskContext
-            .getOutputFormatClass(), conf);
-        committer = outputFormat.getOutputCommitter(taskContext);
-      } catch (Exception e) {
-        throw new YarnRuntimeException(e);
+    return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
+      public OutputCommitter call(Configuration conf) {
+        OutputCommitter committer = null;
+
+        LOG.info("OutputCommitter set in config "
+            + conf.get("mapred.output.committer.class"));
+
+        if (newApiCommitter) {
+          org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID =
+              MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+          org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+              MRBuilderUtils.newTaskAttemptId(taskID, 0);
+          TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+              TypeConverter.fromYarn(attemptID));
+          OutputFormat outputFormat;
+          try {
+            outputFormat = ReflectionUtils.newInstance(taskContext
+                .getOutputFormatClass(), conf);
+            committer = outputFormat.getOutputCommitter(taskContext);
+          } catch (Exception e) {
+            throw new YarnRuntimeException(e);
+          }
+        } else {
+          committer = ReflectionUtils.newInstance(conf.getClass(
+              "mapred.output.committer.class", FileOutputCommitter.class,
+              org.apache.hadoop.mapred.OutputCommitter.class), conf);
+        }
+        LOG.info("OutputCommitter is " + committer.getClass().getName());
+        return committer;
       }
-    } else {
-      committer = ReflectionUtils.newInstance(conf.getClass(
-          "mapred.output.committer.class", FileOutputCommitter.class,
-          org.apache.hadoop.mapred.OutputCommitter.class), conf);
-    }
-    LOG.info("OutputCommitter is " + committer.getClass().getName());
-    return committer;
+    });
   }
 
   protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
@@ -667,38 +679,42 @@ public class MRAppMaster extends Composi
     return new StagingDirCleaningService();
   }
 
-  protected Speculator createSpeculator(Configuration conf, AppContext context) {
-    Class<? extends Speculator> speculatorClass;
-
-    try {
-      speculatorClass
-          // "yarn.mapreduce.job.speculator.class"
-          = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
-                          DefaultSpeculator.class,
-                          Speculator.class);
-      Constructor<? extends Speculator> speculatorConstructor
-          = speculatorClass.getConstructor
-               (Configuration.class, AppContext.class);
-      Speculator result = speculatorConstructor.newInstance(conf, context);
-
-      return result;
-    } catch (InstantiationException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    } catch (IllegalAccessException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    } catch (InvocationTargetException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    } catch (NoSuchMethodException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    }
+  protected Speculator createSpeculator(Configuration conf,
+      final AppContext context) {
+    return callWithJobClassLoader(conf, new Action<Speculator>() {
+      public Speculator call(Configuration conf) {
+        Class<? extends Speculator> speculatorClass;
+        try {
+          speculatorClass
+              // "yarn.mapreduce.job.speculator.class"
+              = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+                              DefaultSpeculator.class,
+                              Speculator.class);
+          Constructor<? extends Speculator> speculatorConstructor
+              = speculatorClass.getConstructor
+                   (Configuration.class, AppContext.class);
+          Speculator result = speculatorConstructor.newInstance(conf, context);
+
+          return result;
+        } catch (InstantiationException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        } catch (IllegalAccessException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        } catch (InvocationTargetException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        } catch (NoSuchMethodException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        }
+      }
+    });
   }
 
   protected TaskAttemptListener createTaskAttemptListener(AppContext context,
@@ -712,7 +728,7 @@ public class MRAppMaster extends Composi
   protected EventHandler<CommitterEvent> createCommitterEventHandler(
       AppContext context, OutputCommitter committer) {
     return new CommitterEventHandler(context, committer,
-        getRMHeartbeatHandler());
+        getRMHeartbeatHandler(), jobClassLoader);
   }
 
   protected ContainerAllocator createContainerAllocator(
@@ -1083,8 +1099,8 @@ public class MRAppMaster extends Composi
     //start all the components
     super.serviceStart();
 
-    // set job classloader if configured
-    MRApps.setJobClassLoader(getConfig());
+    // finally set the job classloader
+    MRApps.setClassLoader(jobClassLoader, getConfig());
 
     if (initFailed) {
       JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
@@ -1101,19 +1117,24 @@ public class MRAppMaster extends Composi
     TaskLog.syncLogsShutdown(logSyncer);
   }
 
-  private boolean isRecoverySupported(OutputCommitter committer2)
-      throws IOException {
+  private boolean isRecoverySupported() throws IOException {
     boolean isSupported = false;
-    JobContext _jobContext;
+    Configuration conf = getConfig();
     if (committer != null) {
+      final JobContext _jobContext;
       if (newApiCommitter) {
          _jobContext = new JobContextImpl(
-            getConfig(), TypeConverter.fromYarn(getJobId()));
+            conf, TypeConverter.fromYarn(getJobId()));
       } else {
           _jobContext = new org.apache.hadoop.mapred.JobContextImpl(
-                new JobConf(getConfig()), TypeConverter.fromYarn(getJobId()));
+                new JobConf(conf), TypeConverter.fromYarn(getJobId()));
       }
-      isSupported = committer.isRecoverySupported(_jobContext);
+      isSupported = callWithJobClassLoader(conf,
+          new ExceptionAction<Boolean>() {
+            public Boolean call(Configuration conf) throws IOException {
+              return committer.isRecoverySupported(_jobContext);
+            }
+      });
     }
     return isSupported;
   }
@@ -1127,7 +1148,7 @@ public class MRAppMaster extends Composi
         MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
         MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
 
-    boolean recoverySupportedByCommitter = isRecoverySupported(committer);
+    boolean recoverySupportedByCommitter = isRecoverySupported();
 
     // If a shuffle secret was not provided by the job client then this app
     // attempt will generate one.  However that disables recovery if there
@@ -1312,7 +1333,7 @@ public class MRAppMaster extends Composi
       this.conf = config;
     }
     @Override
-    public void handle(SpeculatorEvent event) {
+    public void handle(final SpeculatorEvent event) {
       if (disabled) {
         return;
       }
@@ -1339,7 +1360,12 @@ public class MRAppMaster extends Composi
       if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
         || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
         // Speculator IS enabled, direct the event to there.
-        speculator.handle(event);
+        callWithJobClassLoader(conf, new Action<Void>() {
+          public Void call(Configuration conf) {
+            speculator.handle(event);
+            return null;
+          }
+        });
       }
     }
 
@@ -1499,6 +1525,102 @@ public class MRAppMaster extends Composi
     });
   }
 
+  /**
+   * Creates a job classloader based on the configuration if the job classloader
+   * is enabled. It is a no-op if the job classloader is not enabled.
+   */
+  private void createJobClassLoader(Configuration conf) throws IOException {
+    jobClassLoader = MRApps.createJobClassLoader(conf);
+  }
+
+  /**
+   * Executes the given action with the job classloader set as the configuration
+   * classloader as well as the thread context class loader if the job
+   * classloader is enabled. After the call, the original classloader is
+   * restored.
+   *
+   * If the job classloader is enabled and the code needs to load user-supplied
+   * classes via configuration or thread context classloader, this method should
+   * be used in order to load them.
+   *
+   * @param conf the configuration on which the classloader will be set
+   * @param action the callable action to be executed
+   */
+  <T> T callWithJobClassLoader(Configuration conf, Action<T> action) {
+    // if the job classloader is enabled, we may need it to load the (custom)
+    // classes; we make the job classloader available and unset it once it is
+    // done
+    ClassLoader currentClassLoader = conf.getClassLoader();
+    boolean setJobClassLoader =
+        jobClassLoader != null && currentClassLoader != jobClassLoader;
+    if (setJobClassLoader) {
+      MRApps.setClassLoader(jobClassLoader, conf);
+    }
+    try {
+      return action.call(conf);
+    } finally {
+      if (setJobClassLoader) {
+        // restore the original classloader
+        MRApps.setClassLoader(currentClassLoader, conf);
+      }
+    }
+  }
+
+  /**
+   * Executes the given action that can throw a checked exception with the job
+   * classloader set as the configuration classloader as well as the thread
+   * context class loader if the job classloader is enabled. After the call, the
+   * original classloader is restored.
+   *
+   * If the job classloader is enabled and the code needs to load user-supplied
+   * classes via configuration or thread context classloader, this method should
+   * be used in order to load them.
+   *
+   * @param conf the configuration on which the classloader will be set
+   * @param action the callable action to be executed
+   * @throws IOException if the underlying action throws an IOException
+   * @throws YarnRuntimeException if the underlying action throws an exception
+   * other than an IOException
+   */
+  <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action)
+      throws IOException {
+    // if the job classloader is enabled, we may need it to load the (custom)
+    // classes; we make the job classloader available and unset it once it is
+    // done
+    ClassLoader currentClassLoader = conf.getClassLoader();
+    boolean setJobClassLoader =
+        jobClassLoader != null && currentClassLoader != jobClassLoader;
+    if (setJobClassLoader) {
+      MRApps.setClassLoader(jobClassLoader, conf);
+    }
+    try {
+      return action.call(conf);
+    } catch (IOException e) {
+      throw e;
+    } catch (YarnRuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      // wrap it with a YarnRuntimeException
+      throw new YarnRuntimeException(e);
+    } finally {
+      if (setJobClassLoader) {
+        // restore the original classloader
+        MRApps.setClassLoader(currentClassLoader, conf);
+      }
+    }
+  }
+
+  /**
+   * Action to be wrapped with setting and unsetting the job classloader
+   */
+  private static interface Action<T> {
+    T call(Configuration conf);
+  }
+
+  private static interface ExceptionAction<T> {
+    T call(Configuration conf) throws Exception;
+  }
+
   @Override
   protected void serviceStop() throws Exception {
     super.serviceStop();

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Mon Jul 21 21:44:50 2014
@@ -68,6 +68,7 @@ public class CommitterEventHandler exten
   private BlockingQueue<CommitterEvent> eventQueue =
       new LinkedBlockingQueue<CommitterEvent>();
   private final AtomicBoolean stopped;
+  private final ClassLoader jobClassLoader;
   private Thread jobCommitThread = null;
   private int commitThreadCancelTimeoutMs;
   private long commitWindowMs;
@@ -79,11 +80,17 @@ public class CommitterEventHandler exten
 
   public CommitterEventHandler(AppContext context, OutputCommitter committer,
       RMHeartbeatHandler rmHeartbeatHandler) {
+    this(context, committer, rmHeartbeatHandler, null);
+  }
+  
+  public CommitterEventHandler(AppContext context, OutputCommitter committer,
+      RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) {
     super("CommitterEventHandler");
     this.context = context;
     this.committer = committer;
     this.rmHeartbeatHandler = rmHeartbeatHandler;
     this.stopped = new AtomicBoolean(false);
+    this.jobClassLoader = jobClassLoader;
   }
 
   @Override
@@ -109,9 +116,23 @@ public class CommitterEventHandler exten
 
   @Override
   protected void serviceStart() throws Exception {
-    ThreadFactory tf = new ThreadFactoryBuilder()
-      .setNameFormat("CommitterEvent Processor #%d")
-      .build();
+    ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder()
+        .setNameFormat("CommitterEvent Processor #%d");
+    if (jobClassLoader != null) {
+      // if the job classloader is enabled, we need to use the job classloader
+      // as the thread context classloader (TCCL) of these threads in case the
+      // committer needs to load another class via TCCL
+      ThreadFactory backingTf = new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+          Thread thread = new Thread(r);
+          thread.setContextClassLoader(jobClassLoader);
+          return thread;
+        }
+      };
+      tfBuilder.setThreadFactory(backingTf);
+    }
+    ThreadFactory tf = tfBuilder.build();
     launcherPool = new ThreadPoolExecutor(5, 5, 1,
         TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
     eventHandlingThread = new Thread(new Runnable() {

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Mon Jul 21 21:44:50 2014
@@ -64,6 +64,7 @@ public class LocalContainerAllocator ext
   private int nmPort;
   private int nmHttpPort;
   private ContainerId containerId;
+  protected int lastResponseID;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -119,6 +120,11 @@ public class LocalContainerAllocator ext
     if (allocateResponse.getAMCommand() != null) {
       switch(allocateResponse.getAMCommand()) {
       case AM_RESYNC:
+        LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+            + " hence resyncing.");        
+        this.lastResponseID = 0;
+        register();
+        break;
       case AM_SHUTDOWN:
         LOG.info("Event from RM: shutting down Application Master");
         // This can happen if the RM has been restarted. If it is in that state,

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Mon Jul 21 21:44:50 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -216,20 +217,27 @@ public abstract class RMCommunicator ext
     FinishApplicationMasterRequest request =
         FinishApplicationMasterRequest.newInstance(finishState,
           sb.toString(), historyUrl);
-    while (true) {
-      FinishApplicationMasterResponse response =
-          scheduler.finishApplicationMaster(request);
-      if (response.getIsUnregistered()) {
-        // When excepting ClientService, other services are already stopped,
-        // it is safe to let clients know the final states. ClientService
-        // should wait for some time so clients have enough time to know the
-        // final states.
-        RunningAppContext raContext = (RunningAppContext) context;
-        raContext.markSuccessfulUnregistration();
-        break;
+    try {
+      while (true) {
+        FinishApplicationMasterResponse response =
+            scheduler.finishApplicationMaster(request);
+        if (response.getIsUnregistered()) {
+          // When excepting ClientService, other services are already stopped,
+          // it is safe to let clients know the final states. ClientService
+          // should wait for some time so clients have enough time to know the
+          // final states.
+          RunningAppContext raContext = (RunningAppContext) context;
+          raContext.markSuccessfulUnregistration();
+          break;
+        }
+        LOG.info("Waiting for application to be successfully unregistered.");
+        Thread.sleep(rmPollInterval);
       }
-      LOG.info("Waiting for application to be successfully unregistered.");
-      Thread.sleep(rmPollInterval);
+    } catch (ApplicationMasterNotRegisteredException e) {
+      // RM might have restarted or failed over and so lost the fact that AM had
+      // registered before.
+      register();
+      doUnregistration();
     }
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Mon Jul 21 21:44:50 2014
@@ -389,6 +389,7 @@ public class RMContainerAllocator extend
           removed = true;
           assignedRequests.remove(aId);
           containersReleased++;
+          pendingRelease.add(containerId);
           release(containerId);
         }
       }
@@ -641,6 +642,15 @@ public class RMContainerAllocator extend
     if (response.getAMCommand() != null) {
       switch(response.getAMCommand()) {
       case AM_RESYNC:
+          LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+              + " hence resyncing.");
+          lastResponseID = 0;
+
+          // Registering to allow RM to discover an active AM for this
+          // application
+          register();
+          addOutstandingRequestOnResync();
+          break;
       case AM_SHUTDOWN:
         // This can happen if the RM has been restarted. If it is in that state,
         // this application must clean itself up.
@@ -700,6 +710,7 @@ public class RMContainerAllocator extend
         LOG.error("Container complete event for unknown container id "
             + cont.getContainerId());
       } else {
+        pendingRelease.remove(cont.getContainerId());
         assignedRequests.remove(attemptID);
         
         // send the container completed event to Task attempt
@@ -991,6 +1002,7 @@ public class RMContainerAllocator extend
     
     private void containerNotAssigned(Container allocated) {
       containersReleased++;
+      pendingRelease.add(allocated.getId());
       release(allocated.getId());      
     }
     

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Mon Jul 21 21:44:50 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,7 +59,7 @@ public abstract class RMContainerRequest
   
   private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
 
-  private int lastResponseID;
+  protected int lastResponseID;
   private Resource availableResources;
 
   private final RecordFactory recordFactory =
@@ -77,8 +78,11 @@ public abstract class RMContainerRequest
   // numContainers dont end up as duplicates
   private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
-  private final Set<ContainerId> release = new TreeSet<ContainerId>(); 
-
+  private final Set<ContainerId> release = new TreeSet<ContainerId>();
+  // pendingRelease holds history or release requests.request is removed only if
+  // RM sends completedContainer.
+  // How it different from release? --> release is for per allocate() request.
+  protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
   private boolean nodeBlacklistingEnabled;
   private int blacklistDisablePercent;
   private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
@@ -186,6 +190,10 @@ public abstract class RMContainerRequest
     } catch (YarnException e) {
       throw new IOException(e);
     }
+
+    if (isResyncCommand(allocateResponse)) {
+      return allocateResponse;
+    }
     lastResponseID = allocateResponse.getResponseId();
     availableResources = allocateResponse.getAvailableResources();
     lastClusterNmCount = clusterNmCount;
@@ -214,6 +222,28 @@ public abstract class RMContainerRequest
     return allocateResponse;
   }
 
+  protected boolean isResyncCommand(AllocateResponse allocateResponse) {
+    return allocateResponse.getAMCommand() != null
+        && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+  }
+
+  protected void addOutstandingRequestOnResync() {
+    for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
+        .values()) {
+      for (Map<Resource, ResourceRequest> capabalities : rr.values()) {
+        for (ResourceRequest request : capabalities.values()) {
+          addResourceRequestToAsk(request);
+        }
+      }
+    }
+    if (!ignoreBlacklisting.get()) {
+      blacklistAdditions.addAll(blacklistedNodes);
+    }
+    if (!pendingRelease.isEmpty()) {
+      release.addAll(pendingRelease);
+    }
+  }
+
   // May be incorrect if there's multiple NodeManagers running on a single host.
   // knownNodeCount is based on node managers, not hosts. blacklisting is
   // currently based on hosts.

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java Mon Jul 21 21:44:50 2014
@@ -18,17 +18,26 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -46,6 +55,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -53,6 +65,36 @@ import org.mockito.stubbing.Answer;
 public class TestLocalContainerLauncher {
   private static final Log LOG =
       LogFactory.getLog(TestLocalContainerLauncher.class);
+  private static File testWorkDir;
+  private static final String[] localDirs = new String[2];
+
+  private static void delete(File dir) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
+    fs.delete(p, true);
+  }
+
+  @BeforeClass
+  public static void setupTestDirs() throws IOException {
+    testWorkDir = new File("target",
+        TestLocalContainerLauncher.class.getCanonicalName());
+    testWorkDir.delete();
+    testWorkDir.mkdirs();
+    testWorkDir = testWorkDir.getAbsoluteFile();
+    for (int i = 0; i < localDirs.length; i++) {
+      final File dir = new File(testWorkDir, "local-" + i);
+      dir.mkdirs();
+      localDirs[i] = dir.toString();
+    }
+  }
+
+  @AfterClass
+  public static void cleanupTestDirs() throws IOException {
+    if (testWorkDir != null) {
+      delete(testWorkDir);
+    }
+  }
 
   @SuppressWarnings("rawtypes")
   @Test(timeout=10000)
@@ -141,4 +183,35 @@ public class TestLocalContainerLauncher 
     when(container.getNodeId()).thenReturn(nodeId);
     return container;
   }
+
+
+  @Test
+  public void testRenameMapOutputForReduce() throws Exception {
+    final JobConf conf = new JobConf();
+
+    final MROutputFiles mrOutputFiles = new MROutputFiles();
+    mrOutputFiles.setConf(conf);
+
+    // make sure both dirs are distinct
+    //
+    conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString());
+    final Path mapOut = mrOutputFiles.getOutputFileForWrite(1);
+    conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString());
+    final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1);
+    Assert.assertNotEquals("Paths must be different!",
+        mapOut.getParent(), mapOutIdx.getParent());
+
+    // make both dirs part of LOCAL_DIR
+    conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
+
+    final FileContext lfc = FileContext.getLocalFSFileContext(conf);
+    lfc.create(mapOut, EnumSet.of(CREATE)).close();
+    lfc.create(mapOutIdx, EnumSet.of(CREATE)).close();
+
+    final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2);
+    final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0);
+
+    LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles);
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Mon Jul 21 21:44:50 2014
@@ -78,6 +78,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
@@ -95,9 +97,13 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -618,6 +624,10 @@ public class TestRMContainerAllocator {
       super(conf);
     }
 
+    public MyResourceManager(Configuration conf, RMStateStore store) {
+      super(conf, store);
+    }
+
     @Override
     public void serviceStart() throws Exception {
       super.serviceStart();
@@ -1426,6 +1436,13 @@ public class TestRMContainerAllocator {
         rm.getMyFifoScheduler().lastBlacklistRemovals.size());
   }
 
+  private static void assertAsksAndReleases(int expectedAsk,
+      int expectedRelease, MyResourceManager rm) {
+    Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size());
+    Assert.assertEquals(expectedRelease,
+        rm.getMyFifoScheduler().lastRelease.size());
+  }
+
   private static class MyFifoScheduler extends FifoScheduler {
 
     public MyFifoScheduler(RMContext rmContext) {
@@ -1440,6 +1457,7 @@ public class TestRMContainerAllocator {
     }
     
     List<ResourceRequest> lastAsk = null;
+    List<ContainerId> lastRelease = null;
     List<String> lastBlacklistAdditions;
     List<String> lastBlacklistRemovals;
     
@@ -1458,6 +1476,7 @@ public class TestRMContainerAllocator {
         askCopy.add(reqCopy);
       }
       lastAsk = ask;
+      lastRelease = release;
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
       return super.allocate(
@@ -1505,6 +1524,20 @@ public class TestRMContainerAllocator {
     return new ContainerFailedEvent(attemptId, host);    
   }
   
+  private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,
+      int taskAttemptId, boolean reduce) {
+    TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    TaskAttemptId attemptId =
+        MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId);
+    return new ContainerAllocatorEvent(attemptId,
+        ContainerAllocator.EventType.CONTAINER_DEALLOCATE);
+  }
+
   private void checkAssignments(ContainerRequestEvent[] requests,
       List<TaskAttemptContainerAssignedEvent> assignments,
       boolean checkHostMatch) {
@@ -1557,6 +1590,7 @@ public class TestRMContainerAllocator {
     = new ArrayList<JobUpdatedNodesEvent>();
     private MyResourceManager rm;
     private boolean isUnregistered = false;
+    private AllocateResponse allocateResponse;
     private static AppContext createAppContext(
         ApplicationAttemptId appAttemptId, Job job) {
       AppContext context = mock(AppContext.class);
@@ -1668,6 +1702,10 @@ public class TestRMContainerAllocator {
       super.handleEvent(f);
     }
     
+    public void sendDeallocate(ContainerAllocatorEvent f) {
+      super.handleEvent(f);
+    }
+
     // API to be used by tests
     public List<TaskAttemptContainerAssignedEvent> schedule()
         throws Exception {
@@ -1713,6 +1751,20 @@ public class TestRMContainerAllocator {
     public boolean isUnregistered() {
       return isUnregistered;
     }
+
+    public void updateSchedulerProxy(MyResourceManager rm) {
+      scheduler = rm.getApplicationMasterService();
+    }
+
+    @Override
+    protected AllocateResponse makeRemoteRequest() throws IOException {
+      allocateResponse = super.makeRemoteRequest();
+      return allocateResponse;
+    }
+
+    public boolean isResyncCommand() {
+      return super.isResyncCommand(allocateResponse);
+    }
   }
 
   @Test
@@ -2022,6 +2074,198 @@ public class TestRMContainerAllocator {
     Assert.assertTrue(allocator.isUnregistered());
   }
   
+  // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+  // blackListeNode
+  // Step-2 : 2 containers are allocated by RM.
+  // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+  // RM
+  // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+  // additional containerRequest(event4) and blacklisted nodes.
+  // Intern RM send resync command
+  // Step-5 : On Resync,AM sends all outstanding
+  // asks,release,blacklistAaddition
+  // and another containerRequest(event5)
+  // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+  @Test
+  public void testRMContainerAllocatorResendsRequestsOnRMRestart()
+      throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MyResourceManager rm1 = new MyResourceManager(conf, memStore);
+    rm1.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm1.submitApp(1024);
+    dispatcher.await();
+
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+    rm1.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator allocator =
+        new MyContainerAllocator(rm1, conf, appAttemptId, mockJob);
+
+    // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+    // blackListeNode
+    // create the container request
+    // send MAP request
+    ContainerRequestEvent event1 =
+        createReq(jobId, 1, 1024, new String[] { "h1" });
+    allocator.sendRequest(event1);
+
+    ContainerRequestEvent event2 =
+        createReq(jobId, 2, 2048, new String[] { "h1", "h2" });
+    allocator.sendRequest(event2);
+
+    // Send events to blacklist h2
+    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h2", false);
+    allocator.sendFailure(f1);
+
+    // send allocate request and 1 blacklisted nodes
+    List<TaskAttemptContainerAssignedEvent> assignedContainers =
+        allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0,
+        assignedContainers.size());
+    // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed
+    assertAsksAndReleases(3, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(1, 0, rm1);
+
+    nm1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    // Step-2 : 2 containers are allocated by RM.
+    assignedContainers = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 2", 2,
+        assignedContainers.size());
+    assertAsksAndReleases(0, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    assignedContainers = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0,
+        assignedContainers.size());
+    assertAsksAndReleases(3, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+    // RM
+    // send container request
+    ContainerRequestEvent event3 =
+        createReq(jobId, 3, 1000, new String[] { "h1" });
+    allocator.sendRequest(event3);
+
+    // send deallocate request
+    ContainerAllocatorEvent deallocate1 =
+        createDeallocateEvent(jobId, 1, false);
+    allocator.sendDeallocate(deallocate1);
+
+    assignedContainers = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0,
+        assignedContainers.size());
+    assertAsksAndReleases(3, 1, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    // Phase-2 start 2nd RM is up
+    MyResourceManager rm2 = new MyResourceManager(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    allocator.updateSchedulerProxy(rm2);
+    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
+
+    // NM should be rebooted on heartbeat, even first heartbeat for nm2
+    NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
+
+    // new NM to represent NM re-register
+    nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+    // additional containerRequest(event4) and blacklisted nodes.
+    // Intern RM send resync command
+
+    // send deallocate request, release=1
+    ContainerAllocatorEvent deallocate2 =
+        createDeallocateEvent(jobId, 2, false);
+    allocator.sendDeallocate(deallocate2);
+
+    // Send events to blacklist nodes h3
+    ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h3", false);
+    allocator.sendFailure(f2);
+
+    ContainerRequestEvent event4 =
+        createReq(jobId, 4, 2000, new String[] { "h1", "h2" });
+    allocator.sendRequest(event4);
+
+    // send allocate request to 2nd RM and get resync command
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertTrue("Last allocate response is not RESYNC",
+        allocator.isResyncCommand());
+
+    // Step-5 : On Resync,AM sends all outstanding
+    // asks,release,blacklistAaddition
+    // and another containerRequest(event5)
+    ContainerRequestEvent event5 =
+        createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" });
+    allocator.sendRequest(event5);
+
+    // send all outstanding request again.
+    assignedContainers = allocator.schedule();
+    dispatcher.await();
+    assertAsksAndReleases(3, 2, rm2);
+    assertBlacklistAdditionsAndRemovals(2, 0, rm2);
+
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+    assignedContainers = allocator.schedule();
+    dispatcher.await();
+
+    Assert.assertEquals("Number of container should be 3", 3,
+        assignedContainers.size());
+
+    for (TaskAttemptContainerAssignedEvent assig : assignedContainers) {
+      Assert.assertTrue("Assigned count not correct",
+          "h1".equals(assig.getContainer().getNodeId().getHost()));
+    }
+
+    rm1.stop();
+    rm2.stop();
+
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Mon Jul 21 21:44:50 2014
@@ -327,8 +327,8 @@ public class MRApps extends Apps {
   }
 
   /**
-   * Sets a {@link ApplicationClassLoader} on the given configuration and as
-   * the context classloader, if
+   * Creates and sets a {@link ApplicationClassLoader} on the given
+   * configuration and as the thread context classloader, if
    * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
    * the APP_CLASSPATH environment variable is set.
    * @param conf
@@ -336,24 +336,52 @@ public class MRApps extends Apps {
    */
   public static void setJobClassLoader(Configuration conf)
       throws IOException {
+    setClassLoader(createJobClassLoader(conf), conf);
+  }
+
+  /**
+   * Creates a {@link ApplicationClassLoader} if
+   * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
+   * the APP_CLASSPATH environment variable is set.
+   * @param conf
+   * @returns the created job classloader, or null if the job classloader is not
+   * enabled or the APP_CLASSPATH environment variable is not set
+   * @throws IOException
+   */
+  public static ClassLoader createJobClassLoader(Configuration conf)
+      throws IOException {
+    ClassLoader jobClassLoader = null;
     if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
       String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
       if (appClasspath == null) {
-        LOG.warn("Not using job classloader since APP_CLASSPATH is not set.");
+        LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
       } else {
-        LOG.info("Using job classloader");
+        LOG.info("Creating job classloader");
         if (LOG.isDebugEnabled()) {
           LOG.debug("APP_CLASSPATH=" + appClasspath);
         }
         String[] systemClasses = getSystemClasses(conf);
-        ClassLoader jobClassLoader = createJobClassLoader(appClasspath,
+        jobClassLoader = createJobClassLoader(appClasspath,
             systemClasses);
-        if (jobClassLoader != null) {
-          conf.setClassLoader(jobClassLoader);
-          Thread.currentThread().setContextClassLoader(jobClassLoader);
-        }
       }
     }
+    return jobClassLoader;
+  }
+
+  /**
+   * Sets the provided classloader on the given configuration and as the thread
+   * context classloader if the classloader is not null.
+   * @param classLoader
+   * @param conf
+   */
+  public static void setClassLoader(ClassLoader classLoader,
+      Configuration conf) {
+    if (classLoader != null) {
+      LOG.info("Setting classloader " + classLoader.getClass().getName() +
+          " on the configuration and as the thread context classloader");
+      conf.setClassLoader(classLoader);
+      Thread.currentThread().setContextClassLoader(classLoader);
+    }
   }
 
   @VisibleForTesting

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Mon Jul 21 21:44:50 2014
@@ -579,7 +579,7 @@ public abstract class CombineFileInputFo
         blocks = new OneBlockInfo[0];
       } else {
 
-        if(locations.length == 0) {
+        if(locations.length == 0 && !stat.isDirectory()) {
           locations = new BlockLocation[] { new BlockLocation() };
         }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Mon Jul 21 21:44:50 2014
@@ -1275,6 +1275,61 @@ public class TestCombineFileInputFormat 
   }
 
   /**
+   * Test that directories do not get included as part of getSplits()
+   */
+  @Test
+  public void testGetSplitsWithDirectory() throws Exception {
+    MiniDFSCluster dfs = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
+          .build();
+      dfs.waitActive();
+
+      dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
+          .build();
+      dfs.waitActive();
+
+      FileSystem fileSys = dfs.getFileSystem();
+
+      // Set up the following directory structure:
+      // /dir1/: directory
+      // /dir1/file: regular file
+      // /dir1/dir2/: directory
+      Path dir1 = new Path("/dir1");
+      Path file = new Path("/dir1/file1");
+      Path dir2 = new Path("/dir1/dir2");
+      if (!fileSys.mkdirs(dir1)) {
+        throw new IOException("Mkdirs failed to create " + dir1.toString());
+      }
+      FSDataOutputStream out = fileSys.create(file);
+      out.write(new byte[0]);
+      out.close();
+      if (!fileSys.mkdirs(dir2)) {
+        throw new IOException("Mkdirs failed to create " + dir2.toString());
+      }
+
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = Job.getInstance(conf);
+      FileInputFormat.setInputPaths(job, "/dir1");
+      List<InputSplit> splits = inFormat.getSplits(job);
+
+      // directories should be omitted from getSplits() - we should only see file1 and not dir2
+      assertEquals(1, splits.size());
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(file.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(0, fileSplit.getLength(0));
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
+  /**
    * Test when input files are from non-default file systems
    */
   @Test

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Mon Jul 21 21:44:50 2014
@@ -33,8 +33,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
-import org.apache.commons.io.FileUtils;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.FailingMapper;
@@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.ApplicationClassLoader;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
@@ -210,7 +215,19 @@ public class TestMRJobs {
   @Test(timeout = 300000)
   public void testJobClassloader() throws IOException, InterruptedException,
       ClassNotFoundException {
-    LOG.info("\n\n\nStarting testJobClassloader().");
+    testJobClassloader(false);
+  }
+
+  @Test(timeout = 300000)
+  public void testJobClassloaderWithCustomClasses() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    testJobClassloader(true);
+  }
+
+  private void testJobClassloader(boolean useCustomClasses) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    LOG.info("\n\n\nStarting testJobClassloader()"
+        + " useCustomClasses=" + useCustomClasses);
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -221,6 +238,19 @@ public class TestMRJobs {
     // set master address to local to test that local mode applied iff framework == local
     sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
     sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
+    if (useCustomClasses) {
+      // to test AM loading user classes such as output format class, we want
+      // to blacklist them from the system classes (they need to be prepended
+      // as the first match wins)
+      String systemClasses =
+          sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
+      // exclude the custom classes from system classes
+      systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
+          CustomSpeculator.class.getName() + "," +
+          systemClasses;
+      sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
+          systemClasses);
+    }
     sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
     sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
     sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
@@ -233,12 +263,66 @@ public class TestMRJobs {
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.setJarByClass(SleepJob.class);
     job.setMaxMapAttempts(1); // speed up failures
+    if (useCustomClasses) {
+      // set custom output format class and speculator class
+      job.setOutputFormatClass(CustomOutputFormat.class);
+      final Configuration jobConf = job.getConfiguration();
+      jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
+          Speculator.class);
+      // speculation needs to be enabled for the speculator to be loaded
+      jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
+    }
     job.submit();
     boolean succeeded = job.waitForCompletion(true);
     Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
         succeeded);
   }
 
+  public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> {
+    public CustomOutputFormat() {
+      verifyClassLoader(getClass());
+    }
+
+    /**
+     * Verifies that the class was loaded by the job classloader if it is in the
+     * context of the MRAppMaster, and if not throws an exception to fail the
+     * job.
+     */
+    private void verifyClassLoader(Class<?> cls) {
+      // to detect that it is instantiated in the context of the MRAppMaster, we
+      // inspect the stack trace and determine a caller is MRAppMaster
+      for (StackTraceElement e: new Throwable().getStackTrace()) {
+        if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+            !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+          throw new ExceptionInInitializerError("incorrect classloader used");
+        }
+      }
+    }
+  }
+
+  public static class CustomSpeculator extends DefaultSpeculator {
+    public CustomSpeculator(Configuration conf, AppContext context) {
+      super(conf, context);
+      verifyClassLoader(getClass());
+    }
+
+    /**
+     * Verifies that the class was loaded by the job classloader if it is in the
+     * context of the MRAppMaster, and if not throws an exception to fail the
+     * job.
+     */
+    private void verifyClassLoader(Class<?> cls) {
+      // to detect that it is instantiated in the context of the MRAppMaster, we
+      // inspect the stack trace and determine a caller is MRAppMaster
+      for (StackTraceElement e: new Throwable().getStackTrace()) {
+        if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+            !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+          throw new ExceptionInInitializerError("incorrect classloader used");
+        }
+      }
+    }
+  }
+
   protected void verifySleepJobCounters(Job job) throws InterruptedException,
       IOException {
     Counters counters = job.getCounters();