You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/23 03:47:40 UTC
svn commit: r1612742 - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project: ./ bin/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/or...
Author: todd
Date: Wed Jul 23 01:47:28 2014
New Revision: 1612742
URL: http://svn.apache.org/r1612742
Log:
Merge trunk into branch
Modified:
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
Propchange: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1610815-1612740
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.txt?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.txt Wed Jul 23 01:47:28 2014
@@ -17,6 +17,9 @@ Trunk (Unreleased)
MAPREDUCE-5232. Add a configuration to be able to log classpath and other
system properties on mapreduce JVMs startup. (Sangjin Lee via vinodkv)
+ MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving
+ RM-restart. (Rohith via jianhe)
+
IMPROVEMENTS
MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
@@ -153,6 +156,12 @@ Release 2.6.0 - UNRELEASED
IMPROVEMENTS
+ MAPREDUCE-5971. Move the default options for distcp -p to
+ DistCpOptionSwitch. (clamb via wang)
+
+ MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with
+ compatible/incompatible changes (Junping Du via jlowe)
+
OPTIMIZATIONS
BUG FIXES
@@ -163,6 +172,12 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current
attempt is the last retry. (Wangda Tan via zjshen)
+ MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader
+ enabled if custom output format/committer is used (Sangjin Lee via jlowe)
+
+ MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories
+ in its results (Jason Dere via jlowe)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -307,6 +322,9 @@ Release 2.5.0 - UNRELEASED
resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via
vinodkv)
+ MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly
+ assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
+
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1610815-1612740
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh Wed Jul 23 01:47:28 2014
@@ -133,6 +133,7 @@ case $startStop in
else
echo no $command to stop
fi
+ rm -f $pid
else
echo no $command to stop
fi
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Wed Jul 23 01:47:28 2014
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
@@ -438,43 +439,6 @@ public class LocalContainerLauncher exte
}
/**
- * Within the _local_ filesystem (not HDFS), all activity takes place within
- * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
- * and all sub-MapTasks create the same filename ("file.out"). Rename that
- * to something unique (e.g., "map_0.out") to avoid collisions.
- *
- * Longer-term, we'll modify [something] to use TaskAttemptID-based
- * filenames instead of "file.out". (All of this is entirely internal,
- * so there are no particular compatibility issues.)
- */
- private MapOutputFile renameMapOutputForReduce(JobConf conf,
- TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
- FileSystem localFs = FileSystem.getLocal(conf);
- // move map output to reduce input
- Path mapOut = subMapOutputFile.getOutputFile();
- FileStatus mStatus = localFs.getFileStatus(mapOut);
- Path reduceIn = subMapOutputFile.getInputFileForWrite(
- TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
- Path mapOutIndex = new Path(mapOut.toString() + ".index");
- Path reduceInIndex = new Path(reduceIn.toString() + ".index");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming map output file for task attempt "
- + mapId.toString() + " from original location " + mapOut.toString()
- + " to destination " + reduceIn.toString());
- }
- if (!localFs.mkdirs(reduceIn.getParent())) {
- throw new IOException("Mkdirs failed to create "
- + reduceIn.getParent().toString());
- }
- if (!localFs.rename(mapOut, reduceIn))
- throw new IOException("Couldn't rename " + mapOut);
- if (!localFs.rename(mapOutIndex, reduceInIndex))
- throw new IOException("Couldn't rename " + mapOutIndex);
-
- return new RenamedMapOutputFile(reduceIn);
- }
-
- /**
* Also within the local filesystem, we need to restore the initial state
* of the directory as much as possible. Compare current contents against
* the saved original state and nuke everything that doesn't belong, with
@@ -506,7 +470,46 @@ public class LocalContainerLauncher exte
}
} // end EventHandler
-
+
+ /**
+ * Within the _local_ filesystem (not HDFS), all activity takes place within
+ * a subdir inside one of the LOCAL_DIRS
+ * (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+ * and all sub-MapTasks create the same filename ("file.out"). Rename that
+ * to something unique (e.g., "map_0.out") to avoid possible collisions.
+ *
+ * Longer-term, we'll modify [something] to use TaskAttemptID-based
+ * filenames instead of "file.out". (All of this is entirely internal,
+ * so there are no particular compatibility issues.)
+ */
+ @VisibleForTesting
+ protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
+ TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ // move map output to reduce input
+ Path mapOut = subMapOutputFile.getOutputFile();
+ FileStatus mStatus = localFs.getFileStatus(mapOut);
+ Path reduceIn = subMapOutputFile.getInputFileForWrite(
+ TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+ Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
+ Path reduceInIndex = new Path(reduceIn.toString() + ".index");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming map output file for task attempt "
+ + mapId.toString() + " from original location " + mapOut.toString()
+ + " to destination " + reduceIn.toString());
+ }
+ if (!localFs.mkdirs(reduceIn.getParent())) {
+ throw new IOException("Mkdirs failed to create "
+ + reduceIn.getParent().toString());
+ }
+ if (!localFs.rename(mapOut, reduceIn))
+ throw new IOException("Couldn't rename " + mapOut);
+ if (!localFs.rename(mapOutIndex, reduceInIndex))
+ throw new IOException("Couldn't rename " + mapOutIndex);
+
+ return new RenamedMapOutputFile(reduceIn);
+ }
+
private static class RenamedMapOutputFile extends MapOutputFile {
private Path path;
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Jul 23 01:47:28 2014
@@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
@@ -200,6 +199,7 @@ public class MRAppMaster extends Composi
new JobTokenSecretManager();
private JobId jobId;
private boolean newApiCommitter;
+ private ClassLoader jobClassLoader;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler;
@@ -250,6 +250,9 @@ public class MRAppMaster extends Composi
@Override
protected void serviceInit(final Configuration conf) throws Exception {
+ // create the job classloader if enabled
+ createJobClassLoader(conf);
+
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
initJobCredentialsAndUGI(conf);
@@ -387,8 +390,13 @@ public class MRAppMaster extends Composi
addIfService(committerEventHandler);
//policy handling preemption requests from RM
- preemptionPolicy = createPreemptionPolicy(conf);
- preemptionPolicy.init(context);
+ callWithJobClassLoader(conf, new Action<Void>() {
+ public Void call(Configuration conf) {
+ preemptionPolicy = createPreemptionPolicy(conf);
+ preemptionPolicy.init(context);
+ return null;
+ }
+ });
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
@@ -453,33 +461,37 @@ public class MRAppMaster extends Composi
}
private OutputCommitter createOutputCommitter(Configuration conf) {
- OutputCommitter committer = null;
-
- LOG.info("OutputCommitter set in config "
- + conf.get("mapred.output.committer.class"));
-
- if (newApiCommitter) {
- org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
- .newTaskId(jobId, 0, TaskType.MAP);
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
- .newTaskAttemptId(taskID, 0);
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
- TypeConverter.fromYarn(attemptID));
- OutputFormat outputFormat;
- try {
- outputFormat = ReflectionUtils.newInstance(taskContext
- .getOutputFormatClass(), conf);
- committer = outputFormat.getOutputCommitter(taskContext);
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
+ return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
+ public OutputCommitter call(Configuration conf) {
+ OutputCommitter committer = null;
+
+ LOG.info("OutputCommitter set in config "
+ + conf.get("mapred.output.committer.class"));
+
+ if (newApiCommitter) {
+ org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID =
+ MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+ MRBuilderUtils.newTaskAttemptId(taskID, 0);
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attemptID));
+ OutputFormat outputFormat;
+ try {
+ outputFormat = ReflectionUtils.newInstance(taskContext
+ .getOutputFormatClass(), conf);
+ committer = outputFormat.getOutputCommitter(taskContext);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ } else {
+ committer = ReflectionUtils.newInstance(conf.getClass(
+ "mapred.output.committer.class", FileOutputCommitter.class,
+ org.apache.hadoop.mapred.OutputCommitter.class), conf);
+ }
+ LOG.info("OutputCommitter is " + committer.getClass().getName());
+ return committer;
}
- } else {
- committer = ReflectionUtils.newInstance(conf.getClass(
- "mapred.output.committer.class", FileOutputCommitter.class,
- org.apache.hadoop.mapred.OutputCommitter.class), conf);
- }
- LOG.info("OutputCommitter is " + committer.getClass().getName());
- return committer;
+ });
}
protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
@@ -667,38 +679,42 @@ public class MRAppMaster extends Composi
return new StagingDirCleaningService();
}
- protected Speculator createSpeculator(Configuration conf, AppContext context) {
- Class<? extends Speculator> speculatorClass;
-
- try {
- speculatorClass
- // "yarn.mapreduce.job.speculator.class"
- = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
- DefaultSpeculator.class,
- Speculator.class);
- Constructor<? extends Speculator> speculatorConstructor
- = speculatorClass.getConstructor
- (Configuration.class, AppContext.class);
- Speculator result = speculatorConstructor.newInstance(conf, context);
-
- return result;
- } catch (InstantiationException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- } catch (IllegalAccessException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- } catch (InvocationTargetException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- } catch (NoSuchMethodException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- }
+ protected Speculator createSpeculator(Configuration conf,
+ final AppContext context) {
+ return callWithJobClassLoader(conf, new Action<Speculator>() {
+ public Speculator call(Configuration conf) {
+ Class<? extends Speculator> speculatorClass;
+ try {
+ speculatorClass
+ // "yarn.mapreduce.job.speculator.class"
+ = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+ DefaultSpeculator.class,
+ Speculator.class);
+ Constructor<? extends Speculator> speculatorConstructor
+ = speculatorClass.getConstructor
+ (Configuration.class, AppContext.class);
+ Speculator result = speculatorConstructor.newInstance(conf, context);
+
+ return result;
+ } catch (InstantiationException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ } catch (InvocationTargetException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ } catch (NoSuchMethodException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ }
+ }
+ });
}
protected TaskAttemptListener createTaskAttemptListener(AppContext context,
@@ -712,7 +728,7 @@ public class MRAppMaster extends Composi
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) {
return new CommitterEventHandler(context, committer,
- getRMHeartbeatHandler());
+ getRMHeartbeatHandler(), jobClassLoader);
}
protected ContainerAllocator createContainerAllocator(
@@ -1083,8 +1099,8 @@ public class MRAppMaster extends Composi
//start all the components
super.serviceStart();
- // set job classloader if configured
- MRApps.setJobClassLoader(getConfig());
+ // finally set the job classloader
+ MRApps.setClassLoader(jobClassLoader, getConfig());
if (initFailed) {
JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
@@ -1101,19 +1117,24 @@ public class MRAppMaster extends Composi
TaskLog.syncLogsShutdown(logSyncer);
}
- private boolean isRecoverySupported(OutputCommitter committer2)
- throws IOException {
+ private boolean isRecoverySupported() throws IOException {
boolean isSupported = false;
- JobContext _jobContext;
+ Configuration conf = getConfig();
if (committer != null) {
+ final JobContext _jobContext;
if (newApiCommitter) {
_jobContext = new JobContextImpl(
- getConfig(), TypeConverter.fromYarn(getJobId()));
+ conf, TypeConverter.fromYarn(getJobId()));
} else {
_jobContext = new org.apache.hadoop.mapred.JobContextImpl(
- new JobConf(getConfig()), TypeConverter.fromYarn(getJobId()));
+ new JobConf(conf), TypeConverter.fromYarn(getJobId()));
}
- isSupported = committer.isRecoverySupported(_jobContext);
+ isSupported = callWithJobClassLoader(conf,
+ new ExceptionAction<Boolean>() {
+ public Boolean call(Configuration conf) throws IOException {
+ return committer.isRecoverySupported(_jobContext);
+ }
+ });
}
return isSupported;
}
@@ -1127,7 +1148,7 @@ public class MRAppMaster extends Composi
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
- boolean recoverySupportedByCommitter = isRecoverySupported(committer);
+ boolean recoverySupportedByCommitter = isRecoverySupported();
// If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there
@@ -1312,7 +1333,7 @@ public class MRAppMaster extends Composi
this.conf = config;
}
@Override
- public void handle(SpeculatorEvent event) {
+ public void handle(final SpeculatorEvent event) {
if (disabled) {
return;
}
@@ -1339,7 +1360,12 @@ public class MRAppMaster extends Composi
if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
|| (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
// Speculator IS enabled, direct the event to there.
- speculator.handle(event);
+ callWithJobClassLoader(conf, new Action<Void>() {
+ public Void call(Configuration conf) {
+ speculator.handle(event);
+ return null;
+ }
+ });
}
}
@@ -1499,6 +1525,102 @@ public class MRAppMaster extends Composi
});
}
+ /**
+ * Creates a job classloader based on the configuration if the job classloader
+ * is enabled. It is a no-op if the job classloader is not enabled.
+ */
+ private void createJobClassLoader(Configuration conf) throws IOException {
+ jobClassLoader = MRApps.createJobClassLoader(conf);
+ }
+
+ /**
+ * Executes the given action with the job classloader set as the configuration
+ * classloader as well as the thread context class loader if the job
+ * classloader is enabled. After the call, the original classloader is
+ * restored.
+ *
+ * If the job classloader is enabled and the code needs to load user-supplied
+ * classes via configuration or thread context classloader, this method should
+ * be used in order to load them.
+ *
+ * @param conf the configuration on which the classloader will be set
+ * @param action the callable action to be executed
+ */
+ <T> T callWithJobClassLoader(Configuration conf, Action<T> action) {
+ // if the job classloader is enabled, we may need it to load the (custom)
+ // classes; we make the job classloader available and unset it once it is
+ // done
+ ClassLoader currentClassLoader = conf.getClassLoader();
+ boolean setJobClassLoader =
+ jobClassLoader != null && currentClassLoader != jobClassLoader;
+ if (setJobClassLoader) {
+ MRApps.setClassLoader(jobClassLoader, conf);
+ }
+ try {
+ return action.call(conf);
+ } finally {
+ if (setJobClassLoader) {
+ // restore the original classloader
+ MRApps.setClassLoader(currentClassLoader, conf);
+ }
+ }
+ }
+
+ /**
+ * Executes the given action that can throw a checked exception with the job
+ * classloader set as the configuration classloader as well as the thread
+ * context class loader if the job classloader is enabled. After the call, the
+ * original classloader is restored.
+ *
+ * If the job classloader is enabled and the code needs to load user-supplied
+ * classes via configuration or thread context classloader, this method should
+ * be used in order to load them.
+ *
+ * @param conf the configuration on which the classloader will be set
+ * @param action the callable action to be executed
+ * @throws IOException if the underlying action throws an IOException
+ * @throws YarnRuntimeException if the underlying action throws an exception
+ * other than an IOException
+ */
+ <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action)
+ throws IOException {
+ // if the job classloader is enabled, we may need it to load the (custom)
+ // classes; we make the job classloader available and unset it once it is
+ // done
+ ClassLoader currentClassLoader = conf.getClassLoader();
+ boolean setJobClassLoader =
+ jobClassLoader != null && currentClassLoader != jobClassLoader;
+ if (setJobClassLoader) {
+ MRApps.setClassLoader(jobClassLoader, conf);
+ }
+ try {
+ return action.call(conf);
+ } catch (IOException e) {
+ throw e;
+ } catch (YarnRuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ // wrap it with a YarnRuntimeException
+ throw new YarnRuntimeException(e);
+ } finally {
+ if (setJobClassLoader) {
+ // restore the original classloader
+ MRApps.setClassLoader(currentClassLoader, conf);
+ }
+ }
+ }
+
+ /**
+ * Action to be wrapped with setting and unsetting the job classloader
+ */
+ private static interface Action<T> {
+ T call(Configuration conf);
+ }
+
+ private static interface ExceptionAction<T> {
+ T call(Configuration conf) throws Exception;
+ }
+
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Wed Jul 23 01:47:28 2014
@@ -68,6 +68,7 @@ public class CommitterEventHandler exten
private BlockingQueue<CommitterEvent> eventQueue =
new LinkedBlockingQueue<CommitterEvent>();
private final AtomicBoolean stopped;
+ private final ClassLoader jobClassLoader;
private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs;
private long commitWindowMs;
@@ -79,11 +80,17 @@ public class CommitterEventHandler exten
public CommitterEventHandler(AppContext context, OutputCommitter committer,
RMHeartbeatHandler rmHeartbeatHandler) {
+ this(context, committer, rmHeartbeatHandler, null);
+ }
+
+ public CommitterEventHandler(AppContext context, OutputCommitter committer,
+ RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) {
super("CommitterEventHandler");
this.context = context;
this.committer = committer;
this.rmHeartbeatHandler = rmHeartbeatHandler;
this.stopped = new AtomicBoolean(false);
+ this.jobClassLoader = jobClassLoader;
}
@Override
@@ -109,9 +116,23 @@ public class CommitterEventHandler exten
@Override
protected void serviceStart() throws Exception {
- ThreadFactory tf = new ThreadFactoryBuilder()
- .setNameFormat("CommitterEvent Processor #%d")
- .build();
+ ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder()
+ .setNameFormat("CommitterEvent Processor #%d");
+ if (jobClassLoader != null) {
+ // if the job classloader is enabled, we need to use the job classloader
+ // as the thread context classloader (TCCL) of these threads in case the
+ // committer needs to load another class via TCCL
+ ThreadFactory backingTf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setContextClassLoader(jobClassLoader);
+ return thread;
+ }
+ };
+ tfBuilder.setThreadFactory(backingTf);
+ }
+ ThreadFactory tf = tfBuilder.build();
launcherPool = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Wed Jul 23 01:47:28 2014
@@ -64,6 +64,7 @@ public class LocalContainerAllocator ext
private int nmPort;
private int nmHttpPort;
private ContainerId containerId;
+ protected int lastResponseID;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -119,6 +120,11 @@ public class LocalContainerAllocator ext
if (allocateResponse.getAMCommand() != null) {
switch(allocateResponse.getAMCommand()) {
case AM_RESYNC:
+ LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ this.lastResponseID = 0;
+ register();
+ break;
case AM_SHUTDOWN:
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Wed Jul 23 01:47:28 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -216,20 +217,27 @@ public abstract class RMCommunicator ext
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(finishState,
sb.toString(), historyUrl);
- while (true) {
- FinishApplicationMasterResponse response =
- scheduler.finishApplicationMaster(request);
- if (response.getIsUnregistered()) {
- // When excepting ClientService, other services are already stopped,
- // it is safe to let clients know the final states. ClientService
- // should wait for some time so clients have enough time to know the
- // final states.
- RunningAppContext raContext = (RunningAppContext) context;
- raContext.markSuccessfulUnregistration();
- break;
+ try {
+ while (true) {
+ FinishApplicationMasterResponse response =
+ scheduler.finishApplicationMaster(request);
+ if (response.getIsUnregistered()) {
+ // When excepting ClientService, other services are already stopped,
+ // it is safe to let clients know the final states. ClientService
+ // should wait for some time so clients have enough time to know the
+ // final states.
+ RunningAppContext raContext = (RunningAppContext) context;
+ raContext.markSuccessfulUnregistration();
+ break;
+ }
+ LOG.info("Waiting for application to be successfully unregistered.");
+ Thread.sleep(rmPollInterval);
}
- LOG.info("Waiting for application to be successfully unregistered.");
- Thread.sleep(rmPollInterval);
+ } catch (ApplicationMasterNotRegisteredException e) {
+ // RM might have restarted or failed over and so lost the fact that AM had
+ // registered before.
+ register();
+ doUnregistration();
}
}
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Wed Jul 23 01:47:28 2014
@@ -389,6 +389,7 @@ public class RMContainerAllocator extend
removed = true;
assignedRequests.remove(aId);
containersReleased++;
+ pendingRelease.add(containerId);
release(containerId);
}
}
@@ -641,6 +642,15 @@ public class RMContainerAllocator extend
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
case AM_RESYNC:
+ LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ lastResponseID = 0;
+
+ // Registering to allow RM to discover an active AM for this
+ // application
+ register();
+ addOutstandingRequestOnResync();
+ break;
case AM_SHUTDOWN:
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
@@ -700,6 +710,7 @@ public class RMContainerAllocator extend
LOG.error("Container complete event for unknown container id "
+ cont.getContainerId());
} else {
+ pendingRelease.remove(cont.getContainerId());
assignedRequests.remove(attemptID);
// send the container completed event to Task attempt
@@ -991,6 +1002,7 @@ public class RMContainerAllocator extend
private void containerNotAssigned(Container allocated) {
containersReleased++;
+ pendingRelease.add(allocated.getId());
release(allocated.getId());
}
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Wed Jul 23 01:47:28 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,7 +59,7 @@ public abstract class RMContainerRequest
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
- private int lastResponseID;
+ protected int lastResponseID;
private Resource availableResources;
private final RecordFactory recordFactory =
@@ -77,8 +78,11 @@ public abstract class RMContainerRequest
// numContainers dont end up as duplicates
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
- private final Set<ContainerId> release = new TreeSet<ContainerId>();
-
+ private final Set<ContainerId> release = new TreeSet<ContainerId>();
+ // pendingRelease holds history or release requests.request is removed only if
+ // RM sends completedContainer.
+ // How it different from release? --> release is for per allocate() request.
+ protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent;
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
@@ -186,6 +190,10 @@ public abstract class RMContainerRequest
} catch (YarnException e) {
throw new IOException(e);
}
+
+ if (isResyncCommand(allocateResponse)) {
+ return allocateResponse;
+ }
lastResponseID = allocateResponse.getResponseId();
availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount;
@@ -214,6 +222,28 @@ public abstract class RMContainerRequest
return allocateResponse;
}
+ protected boolean isResyncCommand(AllocateResponse allocateResponse) {
+ return allocateResponse.getAMCommand() != null
+ && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+ }
+
+ protected void addOutstandingRequestOnResync() {
+ for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
+ .values()) {
+ for (Map<Resource, ResourceRequest> capabalities : rr.values()) {
+ for (ResourceRequest request : capabalities.values()) {
+ addResourceRequestToAsk(request);
+ }
+ }
+ }
+ if (!ignoreBlacklisting.get()) {
+ blacklistAdditions.addAll(blacklistedNodes);
+ }
+ if (!pendingRelease.isEmpty()) {
+ release.addAll(pendingRelease);
+ }
+ }
+
// May be incorrect if there's multiple NodeManagers running on a single host.
// knownNodeCount is based on node managers, not hosts. blacklisting is
// currently based on hosts.
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java Wed Jul 23 01:47:28 2014
@@ -18,17 +18,26 @@
package org.apache.hadoop.mapred;
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -46,6 +55,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -53,6 +65,36 @@ import org.mockito.stubbing.Answer;
public class TestLocalContainerLauncher {
private static final Log LOG =
LogFactory.getLog(TestLocalContainerLauncher.class);
+ private static File testWorkDir;
+ private static final String[] localDirs = new String[2];
+
+ private static void delete(File dir) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
+ fs.delete(p, true);
+ }
+
+ @BeforeClass
+ public static void setupTestDirs() throws IOException {
+ testWorkDir = new File("target",
+ TestLocalContainerLauncher.class.getCanonicalName());
+ testWorkDir.delete();
+ testWorkDir.mkdirs();
+ testWorkDir = testWorkDir.getAbsoluteFile();
+ for (int i = 0; i < localDirs.length; i++) {
+ final File dir = new File(testWorkDir, "local-" + i);
+ dir.mkdirs();
+ localDirs[i] = dir.toString();
+ }
+ }
+
+ @AfterClass
+ public static void cleanupTestDirs() throws IOException {
+ if (testWorkDir != null) {
+ delete(testWorkDir);
+ }
+ }
@SuppressWarnings("rawtypes")
@Test(timeout=10000)
@@ -141,4 +183,35 @@ public class TestLocalContainerLauncher
when(container.getNodeId()).thenReturn(nodeId);
return container;
}
+
+
+ @Test
+ public void testRenameMapOutputForReduce() throws Exception {
+ final JobConf conf = new JobConf();
+
+ final MROutputFiles mrOutputFiles = new MROutputFiles();
+ mrOutputFiles.setConf(conf);
+
+ // make sure both dirs are distinct
+ //
+ conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString());
+ final Path mapOut = mrOutputFiles.getOutputFileForWrite(1);
+ conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString());
+ final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1);
+ Assert.assertNotEquals("Paths must be different!",
+ mapOut.getParent(), mapOutIdx.getParent());
+
+ // make both dirs part of LOCAL_DIR
+ conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
+
+ final FileContext lfc = FileContext.getLocalFSFileContext(conf);
+ lfc.create(mapOut, EnumSet.of(CREATE)).close();
+ lfc.create(mapOutIdx, EnumSet.of(CREATE)).close();
+
+ final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2);
+ final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0);
+
+ LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles);
+ }
}
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Wed Jul 23 01:47:28 2014
@@ -78,6 +78,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
@@ -95,9 +97,13 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -618,6 +624,10 @@ public class TestRMContainerAllocator {
super(conf);
}
+ public MyResourceManager(Configuration conf, RMStateStore store) {
+ super(conf, store);
+ }
+
@Override
public void serviceStart() throws Exception {
super.serviceStart();
@@ -1426,6 +1436,13 @@ public class TestRMContainerAllocator {
rm.getMyFifoScheduler().lastBlacklistRemovals.size());
}
+ private static void assertAsksAndReleases(int expectedAsk,
+ int expectedRelease, MyResourceManager rm) {
+ Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size());
+ Assert.assertEquals(expectedRelease,
+ rm.getMyFifoScheduler().lastRelease.size());
+ }
+
private static class MyFifoScheduler extends FifoScheduler {
public MyFifoScheduler(RMContext rmContext) {
@@ -1440,6 +1457,7 @@ public class TestRMContainerAllocator {
}
List<ResourceRequest> lastAsk = null;
+ List<ContainerId> lastRelease = null;
List<String> lastBlacklistAdditions;
List<String> lastBlacklistRemovals;
@@ -1458,6 +1476,7 @@ public class TestRMContainerAllocator {
askCopy.add(reqCopy);
}
lastAsk = ask;
+ lastRelease = release;
lastBlacklistAdditions = blacklistAdditions;
lastBlacklistRemovals = blacklistRemovals;
return super.allocate(
@@ -1505,6 +1524,20 @@ public class TestRMContainerAllocator {
return new ContainerFailedEvent(attemptId, host);
}
+ private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,
+ int taskAttemptId, boolean reduce) {
+ TaskId taskId;
+ if (reduce) {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+ } else {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ }
+ TaskAttemptId attemptId =
+ MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId);
+ return new ContainerAllocatorEvent(attemptId,
+ ContainerAllocator.EventType.CONTAINER_DEALLOCATE);
+ }
+
private void checkAssignments(ContainerRequestEvent[] requests,
List<TaskAttemptContainerAssignedEvent> assignments,
boolean checkHostMatch) {
@@ -1557,6 +1590,7 @@ public class TestRMContainerAllocator {
= new ArrayList<JobUpdatedNodesEvent>();
private MyResourceManager rm;
private boolean isUnregistered = false;
+ private AllocateResponse allocateResponse;
private static AppContext createAppContext(
ApplicationAttemptId appAttemptId, Job job) {
AppContext context = mock(AppContext.class);
@@ -1668,6 +1702,10 @@ public class TestRMContainerAllocator {
super.handleEvent(f);
}
+ public void sendDeallocate(ContainerAllocatorEvent f) {
+ super.handleEvent(f);
+ }
+
// API to be used by tests
public List<TaskAttemptContainerAssignedEvent> schedule()
throws Exception {
@@ -1713,6 +1751,20 @@ public class TestRMContainerAllocator {
public boolean isUnregistered() {
return isUnregistered;
}
+
+ public void updateSchedulerProxy(MyResourceManager rm) {
+ scheduler = rm.getApplicationMasterService();
+ }
+
+ @Override
+ protected AllocateResponse makeRemoteRequest() throws IOException {
+ allocateResponse = super.makeRemoteRequest();
+ return allocateResponse;
+ }
+
+ public boolean isResyncCommand() {
+ return super.isResyncCommand(allocateResponse);
+ }
}
@Test
@@ -2022,6 +2074,198 @@ public class TestRMContainerAllocator {
Assert.assertTrue(allocator.isUnregistered());
}
+ // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+ // blackListeNode
+ // Step-2 : 2 containers are allocated by RM.
+ // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+ // RM
+ // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+ // additional containerRequest(event4) and blacklisted nodes.
+ // Intern RM send resync command
+ // Step-5 : On Resync,AM sends all outstanding
+ // asks,release,blacklistAaddition
+ // and another containerRequest(event5)
+ // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+ @Test
+ public void testRMContainerAllocatorResendsRequestsOnRMRestart()
+ throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+ conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+ conf.setInt(
+ MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MyResourceManager rm1 = new MyResourceManager(conf, memStore);
+ rm1.start();
+ DrainDispatcher dispatcher =
+ (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
+ // Submit the application
+ RMApp app = rm1.submitApp(1024);
+ dispatcher.await();
+
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId =
+ app.getCurrentAppAttempt().getAppAttemptId();
+ rm1.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ MyContainerAllocator allocator =
+ new MyContainerAllocator(rm1, conf, appAttemptId, mockJob);
+
+ // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+ // blackListeNode
+ // create the container request
+ // send MAP request
+ ContainerRequestEvent event1 =
+ createReq(jobId, 1, 1024, new String[] { "h1" });
+ allocator.sendRequest(event1);
+
+ ContainerRequestEvent event2 =
+ createReq(jobId, 2, 2048, new String[] { "h1", "h2" });
+ allocator.sendRequest(event2);
+
+ // Send events to blacklist h2
+ ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h2", false);
+ allocator.sendFailure(f1);
+
+ // send allocate request and 1 blacklisted nodes
+ List<TaskAttemptContainerAssignedEvent> assignedContainers =
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0,
+ assignedContainers.size());
+ // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed
+ assertAsksAndReleases(3, 0, rm1);
+ assertBlacklistAdditionsAndRemovals(1, 0, rm1);
+
+ nm1.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ // Step-2 : 2 containers are allocated by RM.
+ assignedContainers = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 2", 2,
+ assignedContainers.size());
+ assertAsksAndReleases(0, 0, rm1);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+ assignedContainers = allocator.schedule();
+ Assert.assertEquals("No of assignments must be 0", 0,
+ assignedContainers.size());
+ assertAsksAndReleases(3, 0, rm1);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+ // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+ // RM
+ // send container request
+ ContainerRequestEvent event3 =
+ createReq(jobId, 3, 1000, new String[] { "h1" });
+ allocator.sendRequest(event3);
+
+ // send deallocate request
+ ContainerAllocatorEvent deallocate1 =
+ createDeallocateEvent(jobId, 1, false);
+ allocator.sendDeallocate(deallocate1);
+
+ assignedContainers = allocator.schedule();
+ Assert.assertEquals("No of assignments must be 0", 0,
+ assignedContainers.size());
+ assertAsksAndReleases(3, 1, rm1);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+ // Phase-2 start 2nd RM is up
+ MyResourceManager rm2 = new MyResourceManager(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ allocator.updateSchedulerProxy(rm2);
+ dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
+
+ // NM should be rebooted on heartbeat, even first heartbeat for nm2
+ NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
+
+ // new NM to represent NM re-register
+ nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+ // additional containerRequest(event4) and blacklisted nodes.
+ // Intern RM send resync command
+
+ // send deallocate request, release=1
+ ContainerAllocatorEvent deallocate2 =
+ createDeallocateEvent(jobId, 2, false);
+ allocator.sendDeallocate(deallocate2);
+
+ // Send events to blacklist nodes h3
+ ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h3", false);
+ allocator.sendFailure(f2);
+
+ ContainerRequestEvent event4 =
+ createReq(jobId, 4, 2000, new String[] { "h1", "h2" });
+ allocator.sendRequest(event4);
+
+ // send allocate request to 2nd RM and get resync command
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertTrue("Last allocate response is not RESYNC",
+ allocator.isResyncCommand());
+
+ // Step-5 : On Resync,AM sends all outstanding
+ // asks,release,blacklistAaddition
+ // and another containerRequest(event5)
+ ContainerRequestEvent event5 =
+ createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" });
+ allocator.sendRequest(event5);
+
+ // send all outstanding request again.
+ assignedContainers = allocator.schedule();
+ dispatcher.await();
+ assertAsksAndReleases(3, 2, rm2);
+ assertBlacklistAdditionsAndRemovals(2, 0, rm2);
+
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+ assignedContainers = allocator.schedule();
+ dispatcher.await();
+
+ Assert.assertEquals("Number of container should be 3", 3,
+ assignedContainers.size());
+
+ for (TaskAttemptContainerAssignedEvent assig : assignedContainers) {
+ Assert.assertTrue("Assigned count not correct",
+ "h1".equals(assig.getContainer().getNodeId().getHost()));
+ }
+
+ rm1.stop();
+ rm2.stop();
+
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Wed Jul 23 01:47:28 2014
@@ -327,8 +327,8 @@ public class MRApps extends Apps {
}
/**
- * Sets a {@link ApplicationClassLoader} on the given configuration and as
- * the context classloader, if
+ * Creates and sets a {@link ApplicationClassLoader} on the given
+ * configuration and as the thread context classloader, if
* {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
* the APP_CLASSPATH environment variable is set.
* @param conf
@@ -336,24 +336,52 @@ public class MRApps extends Apps {
*/
public static void setJobClassLoader(Configuration conf)
throws IOException {
+ setClassLoader(createJobClassLoader(conf), conf);
+ }
+
+ /**
+ * Creates a {@link ApplicationClassLoader} if
+ * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
+ * the APP_CLASSPATH environment variable is set.
+ * @param conf
+ * @returns the created job classloader, or null if the job classloader is not
+ * enabled or the APP_CLASSPATH environment variable is not set
+ * @throws IOException
+ */
+ public static ClassLoader createJobClassLoader(Configuration conf)
+ throws IOException {
+ ClassLoader jobClassLoader = null;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
if (appClasspath == null) {
- LOG.warn("Not using job classloader since APP_CLASSPATH is not set.");
+ LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
} else {
- LOG.info("Using job classloader");
+ LOG.info("Creating job classloader");
if (LOG.isDebugEnabled()) {
LOG.debug("APP_CLASSPATH=" + appClasspath);
}
String[] systemClasses = getSystemClasses(conf);
- ClassLoader jobClassLoader = createJobClassLoader(appClasspath,
+ jobClassLoader = createJobClassLoader(appClasspath,
systemClasses);
- if (jobClassLoader != null) {
- conf.setClassLoader(jobClassLoader);
- Thread.currentThread().setContextClassLoader(jobClassLoader);
- }
}
}
+ return jobClassLoader;
+ }
+
+ /**
+ * Sets the provided classloader on the given configuration and as the thread
+ * context classloader if the classloader is not null.
+ * @param classLoader
+ * @param conf
+ */
+ public static void setClassLoader(ClassLoader classLoader,
+ Configuration conf) {
+ if (classLoader != null) {
+ LOG.info("Setting classloader " + classLoader.getClass().getName() +
+ " on the configuration and as the thread context classloader");
+ conf.setClassLoader(classLoader);
+ Thread.currentThread().setContextClassLoader(classLoader);
+ }
}
@VisibleForTesting
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Wed Jul 23 01:47:28 2014
@@ -579,7 +579,7 @@ public abstract class CombineFileInputFo
blocks = new OneBlockInfo[0];
} else {
- if(locations.length == 0) {
+ if(locations.length == 0 && !stat.isDirectory()) {
locations = new BlockLocation[] { new BlockLocation() };
}
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Wed Jul 23 01:47:28 2014
@@ -1275,6 +1275,61 @@ public class TestCombineFileInputFormat
}
/**
+ * Test that directories do not get included as part of getSplits()
+ */
+ @Test
+ public void testGetSplitsWithDirectory() throws Exception {
+ MiniDFSCluster dfs = null;
+ try {
+ Configuration conf = new Configuration();
+ dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
+ .build();
+ dfs.waitActive();
+
+ dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
+ .build();
+ dfs.waitActive();
+
+ FileSystem fileSys = dfs.getFileSystem();
+
+ // Set up the following directory structure:
+ // /dir1/: directory
+ // /dir1/file: regular file
+ // /dir1/dir2/: directory
+ Path dir1 = new Path("/dir1");
+ Path file = new Path("/dir1/file1");
+ Path dir2 = new Path("/dir1/dir2");
+ if (!fileSys.mkdirs(dir1)) {
+ throw new IOException("Mkdirs failed to create " + dir1.toString());
+ }
+ FSDataOutputStream out = fileSys.create(file);
+ out.write(new byte[0]);
+ out.close();
+ if (!fileSys.mkdirs(dir2)) {
+ throw new IOException("Mkdirs failed to create " + dir2.toString());
+ }
+
+ // split it using a CombinedFile input format
+ DummyInputFormat inFormat = new DummyInputFormat();
+ Job job = Job.getInstance(conf);
+ FileInputFormat.setInputPaths(job, "/dir1");
+ List<InputSplit> splits = inFormat.getSplits(job);
+
+ // directories should be omitted from getSplits() - we should only see file1 and not dir2
+ assertEquals(1, splits.size());
+ CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(file.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(0, fileSplit.getLength(0));
+ } finally {
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
+
+ /**
* Test when input files are from non-default file systems
*/
@Test
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Wed Jul 23 01:47:28 2014
@@ -33,8 +33,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
-import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.FailingMapper;
@@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.ApplicationClassLoader;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.junit.AfterClass;
@@ -210,7 +215,19 @@ public class TestMRJobs {
@Test(timeout = 300000)
public void testJobClassloader() throws IOException, InterruptedException,
ClassNotFoundException {
- LOG.info("\n\n\nStarting testJobClassloader().");
+ testJobClassloader(false);
+ }
+
+ @Test(timeout = 300000)
+ public void testJobClassloaderWithCustomClasses() throws IOException,
+ InterruptedException, ClassNotFoundException {
+ testJobClassloader(true);
+ }
+
+ private void testJobClassloader(boolean useCustomClasses) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ LOG.info("\n\n\nStarting testJobClassloader()"
+ + " useCustomClasses=" + useCustomClasses);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -221,6 +238,19 @@ public class TestMRJobs {
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
+ if (useCustomClasses) {
+ // to test AM loading user classes such as output format class, we want
+ // to blacklist them from the system classes (they need to be prepended
+ // as the first match wins)
+ String systemClasses =
+ sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
+ // exclude the custom classes from system classes
+ systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
+ CustomSpeculator.class.getName() + "," +
+ systemClasses;
+ sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
+ systemClasses);
+ }
sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
@@ -233,12 +263,66 @@ public class TestMRJobs {
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
+ if (useCustomClasses) {
+ // set custom output format class and speculator class
+ job.setOutputFormatClass(CustomOutputFormat.class);
+ final Configuration jobConf = job.getConfiguration();
+ jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
+ Speculator.class);
+ // speculation needs to be enabled for the speculator to be loaded
+ jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
+ }
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
succeeded);
}
+ public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> {
+ public CustomOutputFormat() {
+ verifyClassLoader(getClass());
+ }
+
+ /**
+ * Verifies that the class was loaded by the job classloader if it is in the
+ * context of the MRAppMaster, and if not throws an exception to fail the
+ * job.
+ */
+ private void verifyClassLoader(Class<?> cls) {
+ // to detect that it is instantiated in the context of the MRAppMaster, we
+ // inspect the stack trace and determine a caller is MRAppMaster
+ for (StackTraceElement e: new Throwable().getStackTrace()) {
+ if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+ !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+ throw new ExceptionInInitializerError("incorrect classloader used");
+ }
+ }
+ }
+ }
+
+ public static class CustomSpeculator extends DefaultSpeculator {
+ public CustomSpeculator(Configuration conf, AppContext context) {
+ super(conf, context);
+ verifyClassLoader(getClass());
+ }
+
+ /**
+ * Verifies that the class was loaded by the job classloader if it is in the
+ * context of the MRAppMaster, and if not throws an exception to fail the
+ * job.
+ */
+ private void verifyClassLoader(Class<?> cls) {
+ // to detect that it is instantiated in the context of the MRAppMaster, we
+ // inspect the stack trace and determine a caller is MRAppMaster
+ for (StackTraceElement e: new Throwable().getStackTrace()) {
+ if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+ !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+ throw new ExceptionInInitializerError("incorrect classloader used");
+ }
+ }
+ }
+ }
+
protected void verifySleepJobCounters(Job job) throws InterruptedException,
IOException {
Counters counters = job.getCounters();
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Wed Jul 23 01:47:28 2014
@@ -82,10 +82,13 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory;
@@ -125,6 +128,7 @@ import org.jboss.netty.handler.stream.Ch
import org.jboss.netty.util.CharsetUtil;
import org.mortbay.jetty.HttpHeaders;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@@ -146,8 +150,9 @@ public class ShuffleHandler extends Auxi
Pattern.CASE_INSENSITIVE);
private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
- private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
- private static final String STATE_DB_SCHEMA_VERSION = "1.0";
+ private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
+ protected static final NMDBSchemaVersion CURRENT_VERSION_INFO =
+ NMDBSchemaVersion.newInstance(1, 0);
private int port;
private ChannelFactory selector;
@@ -466,18 +471,15 @@ public class ShuffleHandler extends Auxi
Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
LOG.info("Using state database at " + dbPath + " for recovery");
File dbfile = new File(dbPath.toString());
- byte[] schemaVersionData;
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
- schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating state database at " + dbfile);
options.createIfMissing(true);
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
- schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
- stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
+ storeVersion();
} catch (DBException dbExc) {
throw new IOException("Unable to create state store", dbExc);
}
@@ -485,15 +487,69 @@ public class ShuffleHandler extends Auxi
throw e;
}
}
- if (schemaVersionData != null) {
- String schemaVersion = asString(schemaVersionData);
- // only support exact schema matches for now
- if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
- throw new IOException("Incompatible state database schema, found "
- + schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
- }
+ checkVersion();
+ }
+
+ @VisibleForTesting
+ NMDBSchemaVersion loadVersion() throws IOException {
+ byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
+ // if version is not stored previously, treat it as 1.0.
+ if (data == null || data.length == 0) {
+ return NMDBSchemaVersion.newInstance(1, 0);
+ }
+ NMDBSchemaVersion version =
+ new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
+ return version;
+ }
+
+ private void storeSchemaVersion(NMDBSchemaVersion version) throws IOException {
+ String key = STATE_DB_SCHEMA_VERSION_KEY;
+ byte[] data =
+ ((NMDBSchemaVersionPBImpl) version).getProto().toByteArray();
+ try {
+ stateDb.put(bytes(key), data);
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ private void storeVersion() throws IOException {
+ storeSchemaVersion(CURRENT_VERSION_INFO);
+ }
+
+ // Only used for test
+ @VisibleForTesting
+ void storeVersion(NMDBSchemaVersion version) throws IOException {
+ storeSchemaVersion(version);
+ }
+
+ protected NMDBSchemaVersion getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ /**
+ * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+ * 2) Any incompatible change of DB schema is a major upgrade, and any
+ * compatible change of DB schema is a minor upgrade.
+ * 3) Within a minor upgrade, say 1.1 to 1.2:
+ * overwrite the version info and proceed as normal.
+ * 4) Within a major upgrade, say 1.2 to 2.0:
+ * throw exception and indicate user to use a separate upgrade tool to
+ * upgrade shuffle info or remove incompatible old state.
+ */
+ private void checkVersion() throws IOException {
+ NMDBSchemaVersion loadedVersion = loadVersion();
+ LOG.info("Loaded state DB schema version info " + loadedVersion);
+ if (loadedVersion.equals(getCurrentVersion())) {
+ return;
+ }
+ if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+ LOG.info("Storing state DB schedma version info " + getCurrentVersion());
+ storeVersion();
} else {
- throw new IOException("State database schema version not found");
+ throw new IOException(
+ "Incompatible version for state DB schema: expecting DB schema version "
+ + getCurrentVersion() + ", but loading version " + loadedVersion);
}
}
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Wed Jul 23 01:47:28 2014
@@ -67,6 +67,7 @@ import org.apache.hadoop.metrics2.Metric
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -718,6 +720,94 @@ public class TestShuffleHandler {
FileUtil.fullyDelete(tmpDir);
}
}
+
+ @Test
+ public void testRecoveryFromOtherVersions() throws IOException {
+ final String user = "someuser";
+ final ApplicationId appId = ApplicationId.newInstance(12345, 1);
+ final File tmpDir = new File(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")),
+ TestShuffleHandler.class.getName());
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ ShuffleHandler shuffle = new ShuffleHandler();
+ // emulate aux services startup with recovery enabled
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ tmpDir.mkdirs();
+ try {
+ shuffle.init(conf);
+ shuffle.start();
+
+ // setup a shuffle token for an application
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
+ "identifier".getBytes(), "password".getBytes(), new Text(user),
+ new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffle.initializeApplication(new ApplicationInitializationContext(user,
+ appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+ outputBuffer.getLength())));
+
+ // verify we are authorized to shuffle
+ int rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // emulate shuffle handler restart
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+
+ // verify we are still authorized to shuffle to the old application
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+ NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0);
+ Assert.assertEquals(version, shuffle.getCurrentVersion());
+
+ // emulate shuffle handler restart with compatible version
+ NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1);
+ // update version info before close shuffle
+ shuffle.storeVersion(version11);
+ Assert.assertEquals(version11, shuffle.loadVersion());
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+ // shuffle version will be override by CURRENT_VERSION_INFO after restart
+ // successfully.
+ Assert.assertEquals(version, shuffle.loadVersion());
+ // verify we are still authorized to shuffle to the old application
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // emulate shuffle handler restart with incompatible version
+ NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1);
+ shuffle.storeVersion(version21);
+ Assert.assertEquals(version21, shuffle.loadVersion());
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+
+ try {
+ shuffle.start();
+ Assert.fail("Incompatible version, should expect fail here.");
+ } catch (ServiceStateException e) {
+ Assert.assertTrue("Exception message mismatch",
+ e.getMessage().contains("Incompatible version for state DB schema:"));
+ }
+
+ } finally {
+ if (shuffle != null) {
+ shuffle.close();
+ }
+ FileUtil.fullyDelete(tmpDir);
+ }
+ }
private static int getShuffleResponseCode(ShuffleHandler shuffle,
Token<JobTokenIdentifier> jt) throws IOException {