You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:52 UTC
svn commit: r1619012 [2/7] - in
/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project: ./ bin/
conf/ dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Tue Aug 19 23:49:39 2014
@@ -24,8 +24,14 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
@@ -51,11 +57,14 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* Runs the container task locally in a thread.
* Since all (sub)tasks share the same local directory, they must be executed
@@ -71,7 +80,8 @@ public class LocalContainerLauncher exte
private final HashSet<File> localizedFiles;
private final AppContext context;
private final TaskUmbilicalProtocol umbilical;
- private Thread eventHandlingThread;
+ private ExecutorService taskRunner;
+ private Thread eventHandler;
private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
@@ -115,14 +125,24 @@ public class LocalContainerLauncher exte
}
public void serviceStart() throws Exception {
- eventHandlingThread = new Thread(new SubtaskRunner(), "uber-SubtaskRunner");
- eventHandlingThread.start();
+ // create a single thread for serial execution of tasks
+ // make it a daemon thread so that the process can exit even if the task is
+ // not interruptible
+ taskRunner =
+ Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
+ setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
+ // create and start an event handling thread
+ eventHandler = new Thread(new EventHandler(), "uber-EventHandler");
+ eventHandler.start();
super.serviceStart();
}
public void serviceStop() throws Exception {
- if (eventHandlingThread != null) {
- eventHandlingThread.interrupt();
+ if (eventHandler != null) {
+ eventHandler.interrupt();
+ }
+ if (taskRunner != null) {
+ taskRunner.shutdownNow();
}
super.serviceStop();
}
@@ -158,12 +178,17 @@ public class LocalContainerLauncher exte
* - runs Task (runSubMap() or runSubReduce())
* - TA can safely send TA_UPDATE since in RUNNING state
*/
- private class SubtaskRunner implements Runnable {
+ private class EventHandler implements Runnable {
+ // doneWithMaps and finishedSubMaps are accessed from only
+ // one thread. Therefore, no need to make them volatile.
private boolean doneWithMaps = false;
private int finishedSubMaps = 0;
- SubtaskRunner() {
+ private final Map<TaskAttemptId,Future<?>> futures =
+ new ConcurrentHashMap<TaskAttemptId,Future<?>>();
+
+ EventHandler() {
}
@SuppressWarnings("unchecked")
@@ -172,7 +197,7 @@ public class LocalContainerLauncher exte
ContainerLauncherEvent event = null;
// Collect locations of map outputs to give to reduces
- Map<TaskAttemptID, MapOutputFile> localMapFiles =
+ final Map<TaskAttemptID, MapOutputFile> localMapFiles =
new HashMap<TaskAttemptID, MapOutputFile>();
// _must_ either run subtasks sequentially or accept expense of new JVMs
@@ -183,81 +208,41 @@ public class LocalContainerLauncher exte
event = eventQueue.take();
} catch (InterruptedException e) { // mostly via T_KILL? JOB_KILL?
LOG.error("Returning, interrupted : " + e);
- return;
+ break;
}
LOG.info("Processing the event " + event.toString());
if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
- ContainerRemoteLaunchEvent launchEv =
+ final ContainerRemoteLaunchEvent launchEv =
(ContainerRemoteLaunchEvent)event;
- TaskAttemptId attemptID = launchEv.getTaskAttemptID();
-
- Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
- int numMapTasks = job.getTotalMaps();
- int numReduceTasks = job.getTotalReduces();
-
- // YARN (tracking) Task:
- org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
- job.getTask(attemptID.getTaskId());
- // classic mapred Task:
- org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
-
- // after "launching," send launched event to task attempt to move
- // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
- // do getRemoteTask() call first)
- //There is no port number because we are not really talking to a task
- // tracker. The shuffle is just done through local files. So the
- // port number is set to -1 in this case.
- context.getEventHandler().handle(
- new TaskAttemptContainerLaunchedEvent(attemptID, -1));
-
- if (numMapTasks == 0) {
- doneWithMaps = true;
- }
-
- try {
- if (remoteTask.isMapOrReduce()) {
- JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
- jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
- if (remoteTask.isMapTask()) {
- jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
- } else {
- jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
- }
- context.getEventHandler().handle(jce);
+ // execute the task on a separate thread
+ Future<?> future = taskRunner.submit(new Runnable() {
+ public void run() {
+ runTask(launchEv, localMapFiles);
}
- runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
- (numReduceTasks > 0), localMapFiles);
-
- } catch (RuntimeException re) {
- JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
- jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
- context.getEventHandler().handle(jce);
- // this is our signal that the subtask failed in some way, so
- // simulate a failed JVM/container and send a container-completed
- // event to task attempt (i.e., move state machine from RUNNING
- // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
- context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
- TaskAttemptEventType.TA_CONTAINER_COMPLETED));
- } catch (IOException ioe) {
- // if umbilical itself barfs (in error-handler of runSubMap()),
- // we're pretty much hosed, so do what YarnChild main() does
- // (i.e., exit clumsily--but can never happen, so no worries!)
- LOG.fatal("oopsie... this can never happen: "
- + StringUtils.stringifyException(ioe));
- System.exit(-1);
- }
+ });
+ // remember the current attempt
+ futures.put(event.getTaskAttemptID(), future);
} else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
- // no container to kill, so just send "cleaned" event to task attempt
- // to move us from SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state
- // (or {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
+ // cancel (and interrupt) the current running task associated with the
+ // event
+ TaskAttemptId taId = event.getTaskAttemptID();
+ Future<?> future = futures.remove(taId);
+ if (future != null) {
+ LOG.info("canceling the task attempt " + taId);
+ future.cancel(true);
+ }
+
+ // send "cleaned" event to task attempt to move us from
+ // SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state (or
+ // {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
context.getEventHandler().handle(
- new TaskAttemptEvent(event.getTaskAttemptID(),
+ new TaskAttemptEvent(taId,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
} else {
@@ -267,7 +252,75 @@ public class LocalContainerLauncher exte
}
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("unchecked")
+ private void runTask(ContainerRemoteLaunchEvent launchEv,
+ Map<TaskAttemptID, MapOutputFile> localMapFiles) {
+ TaskAttemptId attemptID = launchEv.getTaskAttemptID();
+
+ Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
+ int numMapTasks = job.getTotalMaps();
+ int numReduceTasks = job.getTotalReduces();
+
+ // YARN (tracking) Task:
+ org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
+ job.getTask(attemptID.getTaskId());
+ // classic mapred Task:
+ org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
+
+ // after "launching," send launched event to task attempt to move
+ // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
+ // do getRemoteTask() call first)
+
+ //There is no port number because we are not really talking to a task
+ // tracker. The shuffle is just done through local files. So the
+ // port number is set to -1 in this case.
+ context.getEventHandler().handle(
+ new TaskAttemptContainerLaunchedEvent(attemptID, -1));
+
+ if (numMapTasks == 0) {
+ doneWithMaps = true;
+ }
+
+ try {
+ if (remoteTask.isMapOrReduce()) {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
+ if (remoteTask.isMapTask()) {
+ jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
+ } else {
+ jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
+ }
+ context.getEventHandler().handle(jce);
+ }
+ runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
+ (numReduceTasks > 0), localMapFiles);
+
+ } catch (RuntimeException re) {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
+ context.getEventHandler().handle(jce);
+ // this is our signal that the subtask failed in some way, so
+ // simulate a failed JVM/container and send a container-completed
+ // event to task attempt (i.e., move state machine from RUNNING
+ // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
+ context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+ } catch (IOException ioe) {
+ // if umbilical itself barfs (in error-handler of runSubMap()),
+ // we're pretty much hosed, so do what YarnChild main() does
+ // (i.e., exit clumsily--but can never happen, so no worries!)
+ LOG.fatal("oopsie... this can never happen: "
+ + StringUtils.stringifyException(ioe));
+ ExitUtil.terminate(-1);
+ } finally {
+ // remove my future
+ if (futures.remove(attemptID) != null) {
+ LOG.info("removed attempt " + attemptID +
+ " from the futures to keep track of");
+ }
+ }
+ }
+
private void runSubtask(org.apache.hadoop.mapred.Task task,
final TaskType taskType,
TaskAttemptId attemptID,
@@ -355,7 +408,9 @@ public class LocalContainerLauncher exte
} catch (FSError e) {
LOG.fatal("FSError from child", e);
// umbilical: MRAppMaster creates (taskAttemptListener), passes to us
- umbilical.fsError(classicAttemptID, e.getMessage());
+ if (!ShutdownHookManager.get().isShutdownInProgress()) {
+ umbilical.fsError(classicAttemptID, e.getMessage());
+ }
throw new RuntimeException();
} catch (Exception exception) {
@@ -378,54 +433,18 @@ public class LocalContainerLauncher exte
} catch (Throwable throwable) {
LOG.fatal("Error running local (uberized) 'child' : "
+ StringUtils.stringifyException(throwable));
- Throwable tCause = throwable.getCause();
- String cause = (tCause == null)
- ? throwable.getMessage()
- : StringUtils.stringifyException(tCause);
- umbilical.fatalError(classicAttemptID, cause);
+ if (!ShutdownHookManager.get().isShutdownInProgress()) {
+ Throwable tCause = throwable.getCause();
+ String cause =
+ (tCause == null) ? throwable.getMessage() : StringUtils
+ .stringifyException(tCause);
+ umbilical.fatalError(classicAttemptID, cause);
+ }
throw new RuntimeException();
}
}
/**
- * Within the _local_ filesystem (not HDFS), all activity takes place within
- * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
- * and all sub-MapTasks create the same filename ("file.out"). Rename that
- * to something unique (e.g., "map_0.out") to avoid collisions.
- *
- * Longer-term, we'll modify [something] to use TaskAttemptID-based
- * filenames instead of "file.out". (All of this is entirely internal,
- * so there are no particular compatibility issues.)
- */
- @SuppressWarnings("deprecation")
- private MapOutputFile renameMapOutputForReduce(JobConf conf,
- TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
- FileSystem localFs = FileSystem.getLocal(conf);
- // move map output to reduce input
- Path mapOut = subMapOutputFile.getOutputFile();
- FileStatus mStatus = localFs.getFileStatus(mapOut);
- Path reduceIn = subMapOutputFile.getInputFileForWrite(
- TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
- Path mapOutIndex = new Path(mapOut.toString() + ".index");
- Path reduceInIndex = new Path(reduceIn.toString() + ".index");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming map output file for task attempt "
- + mapId.toString() + " from original location " + mapOut.toString()
- + " to destination " + reduceIn.toString());
- }
- if (!localFs.mkdirs(reduceIn.getParent())) {
- throw new IOException("Mkdirs failed to create "
- + reduceIn.getParent().toString());
- }
- if (!localFs.rename(mapOut, reduceIn))
- throw new IOException("Couldn't rename " + mapOut);
- if (!localFs.rename(mapOutIndex, reduceInIndex))
- throw new IOException("Couldn't rename " + mapOutIndex);
-
- return new RenamedMapOutputFile(reduceIn);
- }
-
- /**
* Also within the local filesystem, we need to restore the initial state
* of the directory as much as possible. Compare current contents against
* the saved original state and nuke everything that doesn't belong, with
@@ -456,8 +475,47 @@ public class LocalContainerLauncher exte
}
}
- } // end SubtaskRunner
-
+ } // end EventHandler
+
+ /**
+ * Within the _local_ filesystem (not HDFS), all activity takes place within
+ * a subdir inside one of the LOCAL_DIRS
+ * (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+ * and all sub-MapTasks create the same filename ("file.out"). Rename that
+ * to something unique (e.g., "map_0.out") to avoid possible collisions.
+ *
+ * Longer-term, we'll modify [something] to use TaskAttemptID-based
+ * filenames instead of "file.out". (All of this is entirely internal,
+ * so there are no particular compatibility issues.)
+ */
+ @VisibleForTesting
+ protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
+ TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ // move map output to reduce input
+ Path mapOut = subMapOutputFile.getOutputFile();
+ FileStatus mStatus = localFs.getFileStatus(mapOut);
+ Path reduceIn = subMapOutputFile.getInputFileForWrite(
+ TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+ Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
+ Path reduceInIndex = new Path(reduceIn.toString() + ".index");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming map output file for task attempt "
+ + mapId.toString() + " from original location " + mapOut.toString()
+ + " to destination " + reduceIn.toString());
+ }
+ if (!localFs.mkdirs(reduceIn.getParent())) {
+ throw new IOException("Mkdirs failed to create "
+ + reduceIn.getParent().toString());
+ }
+ if (!localFs.rename(mapOut, reduceIn))
+ throw new IOException("Couldn't rename " + mapOut);
+ if (!localFs.rename(mapOutIndex, reduceInIndex))
+ throw new IOException("Couldn't rename " + mapOutIndex);
+
+ return new RenamedMapOutputFile(reduceIn);
+ }
+
private static class RenamedMapOutputFile extends MapOutputFile {
private Path path;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Tue Aug 19 23:49:39 2014
@@ -141,7 +141,9 @@ public class TaskAttemptListenerImpl ext
}
server.start();
- this.address = NetUtils.getConnectAddress(server);
+ this.address = NetUtils.createSocketAddrForHost(
+ context.getNMHostname(),
+ server.getListenerAddress().getPort());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Tue Aug 19 23:49:39 2014
@@ -56,6 +56,7 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -176,7 +177,9 @@ class YarnChild {
});
} catch (FSError e) {
LOG.fatal("FSError from child", e);
- umbilical.fsError(taskid, e.getMessage());
+ if (!ShutdownHookManager.get().isShutdownInProgress()) {
+ umbilical.fsError(taskid, e.getMessage());
+ }
} catch (Exception exception) {
LOG.warn("Exception running child : "
+ StringUtils.stringifyException(exception));
@@ -201,17 +204,22 @@ class YarnChild {
}
// Report back any failures, for diagnostic purposes
if (taskid != null) {
- umbilical.fatalError(taskid, StringUtils.stringifyException(exception));
+ if (!ShutdownHookManager.get().isShutdownInProgress()) {
+ umbilical.fatalError(taskid,
+ StringUtils.stringifyException(exception));
+ }
}
} catch (Throwable throwable) {
LOG.fatal("Error running child : "
+ StringUtils.stringifyException(throwable));
if (taskid != null) {
- Throwable tCause = throwable.getCause();
- String cause = tCause == null
- ? throwable.getMessage()
- : StringUtils.stringifyException(tCause);
- umbilical.fatalError(taskid, cause);
+ if (!ShutdownHookManager.get().isShutdownInProgress()) {
+ Throwable tCause = throwable.getCause();
+ String cause =
+ tCause == null ? throwable.getMessage() : StringUtils
+ .stringifyException(tCause);
+ umbilical.fatalError(taskid, cause);
+ }
}
} finally {
RPC.stopProxy(umbilical);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Aug 19 23:49:39 2014
@@ -28,13 +28,13 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -74,7 +74,9 @@ public class JobHistoryEventHandler exte
private int eventCounter;
- //TODO Does the FS object need to be different ?
+ // Those file systems may differ from the job configuration
+ // See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils
+ // #ensurePathInDefaultFileSystem
private FileSystem stagingDirFS; // log Dir FileSystem
private FileSystem doneDirFS; // done Dir FileSystem
@@ -141,7 +143,7 @@ public class JobHistoryEventHandler exte
//Check for the existence of the history staging dir. Maybe create it.
try {
stagingDirPath =
- FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
+ FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr));
stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
mkdir(stagingDirFS, stagingDirPath, new FsPermission(
JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
@@ -154,7 +156,7 @@ public class JobHistoryEventHandler exte
//Check for the existence of intermediate done dir.
Path doneDirPath = null;
try {
- doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
+ doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr));
doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
// This directory will be in a common location, or this may be a cluster
// meant for a single user. Creating based on the conf. Should ideally be
@@ -194,7 +196,7 @@ public class JobHistoryEventHandler exte
//Check/create user directory under intermediate done dir.
try {
doneDirPrefixPath =
- FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
+ FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr));
mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
} catch (IOException e) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java Tue Aug 19 23:49:39 2014
@@ -66,4 +66,5 @@ public interface AppContext {
boolean hasSuccessfullyUnregistered();
+ String getNMHostname();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Aug 19 23:49:39 2014
@@ -41,13 +41,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -67,6 +67,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -184,7 +185,6 @@ public class MRAppMaster extends Composi
private final int nmPort;
private final int nmHttpPort;
protected final MRAppMetrics metrics;
- private final int maxAppAttempts;
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private List<AMInfo> amInfos;
private AppContext context;
@@ -199,6 +199,7 @@ public class MRAppMaster extends Composi
new JobTokenSecretManager();
private JobId jobId;
private boolean newApiCommitter;
+ private ClassLoader jobClassLoader;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler;
@@ -225,14 +226,14 @@ public class MRAppMaster extends Composi
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
- long appSubmitTime, int maxAppAttempts) {
+ long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
- new SystemClock(), appSubmitTime, maxAppAttempts);
+ new SystemClock(), appSubmitTime);
}
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
- Clock clock, long appSubmitTime, int maxAppAttempts) {
+ Clock clock, long appSubmitTime) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
@@ -243,25 +244,21 @@ public class MRAppMaster extends Composi
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
- this.maxAppAttempts = maxAppAttempts;
logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@Override
protected void serviceInit(final Configuration conf) throws Exception {
+ // create the job classloader if enabled
+ createJobClassLoader(conf);
+
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
initJobCredentialsAndUGI(conf);
context = new RunningAppContext(conf);
- ((RunningAppContext)context).computeIsLastAMRetry();
- LOG.info("The specific max attempts: " + maxAppAttempts +
- " for application: " + appAttemptID.getApplicationId().getId() +
- ". Attempt num: " + appAttemptID.getAttemptId() +
- " is last retry: " + isLastAMRetry);
-
// Job name is the same as the app name util we support DAG of jobs
// for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
@@ -393,8 +390,13 @@ public class MRAppMaster extends Composi
addIfService(committerEventHandler);
//policy handling preemption requests from RM
- preemptionPolicy = createPreemptionPolicy(conf);
- preemptionPolicy.init(context);
+ callWithJobClassLoader(conf, new Action<Void>() {
+ public Void call(Configuration conf) {
+ preemptionPolicy = createPreemptionPolicy(conf);
+ preemptionPolicy.init(context);
+ return null;
+ }
+ });
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
@@ -459,33 +461,37 @@ public class MRAppMaster extends Composi
}
private OutputCommitter createOutputCommitter(Configuration conf) {
- OutputCommitter committer = null;
-
- LOG.info("OutputCommitter set in config "
- + conf.get("mapred.output.committer.class"));
-
- if (newApiCommitter) {
- org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
- .newTaskId(jobId, 0, TaskType.MAP);
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
- .newTaskAttemptId(taskID, 0);
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
- TypeConverter.fromYarn(attemptID));
- OutputFormat outputFormat;
- try {
- outputFormat = ReflectionUtils.newInstance(taskContext
- .getOutputFormatClass(), conf);
- committer = outputFormat.getOutputCommitter(taskContext);
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
+ return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
+ public OutputCommitter call(Configuration conf) {
+ OutputCommitter committer = null;
+
+ LOG.info("OutputCommitter set in config "
+ + conf.get("mapred.output.committer.class"));
+
+ if (newApiCommitter) {
+ org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID =
+ MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+ MRBuilderUtils.newTaskAttemptId(taskID, 0);
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attemptID));
+ OutputFormat outputFormat;
+ try {
+ outputFormat = ReflectionUtils.newInstance(taskContext
+ .getOutputFormatClass(), conf);
+ committer = outputFormat.getOutputCommitter(taskContext);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ } else {
+ committer = ReflectionUtils.newInstance(conf.getClass(
+ "mapred.output.committer.class", FileOutputCommitter.class,
+ org.apache.hadoop.mapred.OutputCommitter.class), conf);
+ }
+ LOG.info("OutputCommitter is " + committer.getClass().getName());
+ return committer;
}
- } else {
- committer = ReflectionUtils.newInstance(conf.getClass(
- "mapred.output.committer.class", FileOutputCommitter.class,
- org.apache.hadoop.mapred.OutputCommitter.class), conf);
- }
- LOG.info("OutputCommitter is " + committer.getClass().getName());
- return committer;
+ });
}
protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
@@ -600,10 +606,6 @@ public class MRAppMaster extends Composi
LOG.warn("Graceful stop failed ", t);
}
- //Bring the process down by force.
- //Not needed after HADOOP-7140
- LOG.info("Exiting MR AppMaster..GoodBye!");
- sysexit();
}
private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
@@ -677,38 +679,42 @@ public class MRAppMaster extends Composi
return new StagingDirCleaningService();
}
- protected Speculator createSpeculator(Configuration conf, AppContext context) {
- Class<? extends Speculator> speculatorClass;
-
- try {
- speculatorClass
- // "yarn.mapreduce.job.speculator.class"
- = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
- DefaultSpeculator.class,
- Speculator.class);
- Constructor<? extends Speculator> speculatorConstructor
- = speculatorClass.getConstructor
- (Configuration.class, AppContext.class);
- Speculator result = speculatorConstructor.newInstance(conf, context);
-
- return result;
- } catch (InstantiationException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- } catch (IllegalAccessException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- } catch (InvocationTargetException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- } catch (NoSuchMethodException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- }
+ protected Speculator createSpeculator(Configuration conf,
+ final AppContext context) {
+ return callWithJobClassLoader(conf, new Action<Speculator>() {
+ public Speculator call(Configuration conf) {
+ Class<? extends Speculator> speculatorClass;
+ try {
+ speculatorClass
+ // "yarn.mapreduce.job.speculator.class"
+ = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+ DefaultSpeculator.class,
+ Speculator.class);
+ Constructor<? extends Speculator> speculatorConstructor
+ = speculatorClass.getConstructor
+ (Configuration.class, AppContext.class);
+ Speculator result = speculatorConstructor.newInstance(conf, context);
+
+ return result;
+ } catch (InstantiationException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ } catch (InvocationTargetException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ } catch (NoSuchMethodException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ }
+ }
+ });
}
protected TaskAttemptListener createTaskAttemptListener(AppContext context,
@@ -722,7 +728,7 @@ public class MRAppMaster extends Composi
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) {
return new CommitterEventHandler(context, committer,
- getRMHeartbeatHandler());
+ getRMHeartbeatHandler(), jobClassLoader);
}
protected ContainerAllocator createContainerAllocator(
@@ -1009,8 +1015,13 @@ public class MRAppMaster extends Composi
successfullyUnregistered.set(true);
}
- public void computeIsLastAMRetry() {
- isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+ public void resetIsLastAMRetry() {
+ isLastAMRetry = false;
+ }
+
+ @Override
+ public String getNMHostname() {
+ return nmHost;
}
}
@@ -1054,6 +1065,7 @@ public class MRAppMaster extends Composi
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
+ boolean initFailed = false;
if (!errorHappenedShutDown) {
// create a job event for job intialization
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
@@ -1062,6 +1074,10 @@ public class MRAppMaster extends Composi
// job-init to be done completely here.
jobEventDispatcher.handle(initJobEvent);
+ // If job is still not initialized, an error happened during
+ // initialization. Must complete starting all of the services so failure
+ // events can be processed.
+ initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
// JobImpl's InitTransition is done (call above is synchronous), so the
// "uber-decision" (MR-1220) has been made. Query job and switch to
@@ -1088,10 +1104,16 @@ public class MRAppMaster extends Composi
//start all the components
super.serviceStart();
- // set job classloader if configured
- MRApps.setJobClassLoader(getConfig());
- // All components have started, start the job.
- startJobs();
+ // finally set the job classloader
+ MRApps.setClassLoader(jobClassLoader, getConfig());
+
+ if (initFailed) {
+ JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
+ jobEventDispatcher.handle(initFailedEvent);
+ } else {
+ // All components have started, start the job.
+ startJobs();
+ }
}
@Override
@@ -1100,7 +1122,29 @@ public class MRAppMaster extends Composi
TaskLog.syncLogsShutdown(logSyncer);
}
- private void processRecovery() {
+ private boolean isRecoverySupported() throws IOException {
+ boolean isSupported = false;
+ Configuration conf = getConfig();
+ if (committer != null) {
+ final JobContext _jobContext;
+ if (newApiCommitter) {
+ _jobContext = new JobContextImpl(
+ conf, TypeConverter.fromYarn(getJobId()));
+ } else {
+ _jobContext = new org.apache.hadoop.mapred.JobContextImpl(
+ new JobConf(conf), TypeConverter.fromYarn(getJobId()));
+ }
+ isSupported = callWithJobClassLoader(conf,
+ new ExceptionAction<Boolean>() {
+ public Boolean call(Configuration conf) throws IOException {
+ return committer.isRecoverySupported(_jobContext);
+ }
+ });
+ }
+ return isSupported;
+ }
+
+ private void processRecovery() throws IOException{
if (appAttemptID.getAttemptId() == 1) {
return; // no need to recover on the first attempt
}
@@ -1108,8 +1152,8 @@ public class MRAppMaster extends Composi
boolean recoveryEnabled = getConfig().getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
- boolean recoverySupportedByCommitter =
- committer != null && committer.isRecoverySupported();
+
+ boolean recoverySupportedByCommitter = isRecoverySupported();
// If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there
@@ -1294,7 +1338,7 @@ public class MRAppMaster extends Composi
this.conf = config;
}
@Override
- public void handle(SpeculatorEvent event) {
+ public void handle(final SpeculatorEvent event) {
if (disabled) {
return;
}
@@ -1321,7 +1365,12 @@ public class MRAppMaster extends Composi
if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
|| (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
// Speculator IS enabled, direct the event to there.
- speculator.handle(event);
+ callWithJobClassLoader(conf, new Action<Void>() {
+ public Void call(Configuration conf) {
+ speculator.handle(event);
+ return null;
+ }
+ });
}
}
@@ -1362,8 +1411,6 @@ public class MRAppMaster extends Composi
System.getenv(Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
- String maxAppAttempts =
- System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
validateInputParam(containerIdStr,
Environment.CONTAINER_ID.name());
@@ -1373,8 +1420,6 @@ public class MRAppMaster extends Composi
Environment.NM_HTTP_PORT.name());
validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
- validateInputParam(maxAppAttempts,
- ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
@@ -1385,8 +1430,7 @@ public class MRAppMaster extends Composi
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
- Integer.parseInt(nodeHttpPortString), appSubmitTime,
- Integer.parseInt(maxAppAttempts));
+ Integer.parseInt(nodeHttpPortString), appSubmitTime);
ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
JobConf conf = new JobConf(new YarnConfiguration());
@@ -1486,6 +1530,102 @@ public class MRAppMaster extends Composi
});
}
+ /**
+ * Creates a job classloader based on the configuration if the job classloader
+ * is enabled. It is a no-op if the job classloader is not enabled.
+ */
+ private void createJobClassLoader(Configuration conf) throws IOException {
+ jobClassLoader = MRApps.createJobClassLoader(conf);
+ }
+
+ /**
+ * Executes the given action with the job classloader set as the configuration
+ * classloader as well as the thread context class loader if the job
+ * classloader is enabled. After the call, the original classloader is
+ * restored.
+ *
+ * If the job classloader is enabled and the code needs to load user-supplied
+ * classes via configuration or thread context classloader, this method should
+ * be used in order to load them.
+ *
+ * @param conf the configuration on which the classloader will be set
+ * @param action the callable action to be executed
+ */
+ <T> T callWithJobClassLoader(Configuration conf, Action<T> action) {
+ // if the job classloader is enabled, we may need it to load the (custom)
+ // classes; we make the job classloader available and unset it once it is
+ // done
+ ClassLoader currentClassLoader = conf.getClassLoader();
+ boolean setJobClassLoader =
+ jobClassLoader != null && currentClassLoader != jobClassLoader;
+ if (setJobClassLoader) {
+ MRApps.setClassLoader(jobClassLoader, conf);
+ }
+ try {
+ return action.call(conf);
+ } finally {
+ if (setJobClassLoader) {
+ // restore the original classloader
+ MRApps.setClassLoader(currentClassLoader, conf);
+ }
+ }
+ }
+
+ /**
+ * Executes the given action that can throw a checked exception with the job
+ * classloader set as the configuration classloader as well as the thread
+ * context class loader if the job classloader is enabled. After the call, the
+ * original classloader is restored.
+ *
+ * If the job classloader is enabled and the code needs to load user-supplied
+ * classes via configuration or thread context classloader, this method should
+ * be used in order to load them.
+ *
+ * @param conf the configuration on which the classloader will be set
+ * @param action the callable action to be executed
+ * @throws IOException if the underlying action throws an IOException
+ * @throws YarnRuntimeException if the underlying action throws an exception
+ * other than an IOException
+ */
+ <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action)
+ throws IOException {
+ // if the job classloader is enabled, we may need it to load the (custom)
+ // classes; we make the job classloader available and unset it once it is
+ // done
+ ClassLoader currentClassLoader = conf.getClassLoader();
+ boolean setJobClassLoader =
+ jobClassLoader != null && currentClassLoader != jobClassLoader;
+ if (setJobClassLoader) {
+ MRApps.setClassLoader(jobClassLoader, conf);
+ }
+ try {
+ return action.call(conf);
+ } catch (IOException e) {
+ throw e;
+ } catch (YarnRuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ // wrap it with a YarnRuntimeException
+ throw new YarnRuntimeException(e);
+ } finally {
+ if (setJobClassLoader) {
+ // restore the original classloader
+ MRApps.setClassLoader(currentClassLoader, conf);
+ }
+ }
+ }
+
+ /**
+ * Action to be wrapped with setting and unsetting the job classloader
+ */
+ private static interface Action<T> {
+ T call(Configuration conf);
+ }
+
+ private static interface ExceptionAction<T> {
+ T call(Configuration conf) throws Exception;
+ }
+
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Tue Aug 19 23:49:39 2014
@@ -131,7 +131,8 @@ public class MRClientService extends Abs
}
server.start();
- this.bindAddress = NetUtils.getConnectAddress(server);
+ this.bindAddress = NetUtils.createSocketAddrForHost(appContext.getNMHostname(),
+ server.getListenerAddress().getPort());
LOG.info("Instantiated MRClientService at " + this.bindAddress);
try {
// Explicitly disabling SSL for map reduce task as we can't allow MR users
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Tue Aug 19 23:49:39 2014
@@ -68,6 +68,7 @@ public class CommitterEventHandler exten
private BlockingQueue<CommitterEvent> eventQueue =
new LinkedBlockingQueue<CommitterEvent>();
private final AtomicBoolean stopped;
+ private final ClassLoader jobClassLoader;
private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs;
private long commitWindowMs;
@@ -79,11 +80,17 @@ public class CommitterEventHandler exten
public CommitterEventHandler(AppContext context, OutputCommitter committer,
RMHeartbeatHandler rmHeartbeatHandler) {
+ this(context, committer, rmHeartbeatHandler, null);
+ }
+
+ public CommitterEventHandler(AppContext context, OutputCommitter committer,
+ RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) {
super("CommitterEventHandler");
this.context = context;
this.committer = committer;
this.rmHeartbeatHandler = rmHeartbeatHandler;
this.stopped = new AtomicBoolean(false);
+ this.jobClassLoader = jobClassLoader;
}
@Override
@@ -109,9 +116,23 @@ public class CommitterEventHandler exten
@Override
protected void serviceStart() throws Exception {
- ThreadFactory tf = new ThreadFactoryBuilder()
- .setNameFormat("CommitterEvent Processor #%d")
- .build();
+ ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder()
+ .setNameFormat("CommitterEvent Processor #%d");
+ if (jobClassLoader != null) {
+ // if the job classloader is enabled, we need to use the job classloader
+ // as the thread context classloader (TCCL) of these threads in case the
+ // committer needs to load another class via TCCL
+ ThreadFactory backingTf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setContextClassLoader(jobClassLoader);
+ return thread;
+ }
+ };
+ tfBuilder.setThreadFactory(backingTf);
+ }
+ ThreadFactory tf = tfBuilder.build();
launcherPool = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Tue Aug 19 23:49:39 2014
@@ -28,6 +28,7 @@ public enum JobEventType {
//Producer:MRAppMaster
JOB_INIT,
+ JOB_INIT_FAILED,
JOB_START,
//Producer:Task
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Aug 19 23:49:39 2014
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -129,6 +130,8 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
*/
@@ -145,10 +148,10 @@ public class JobImpl implements org.apac
private static final Log LOG = LogFactory.getLog(JobImpl.class);
//The maximum fraction of fetch failures allowed for a map
- private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
-
- // Maximum no. of fetch-failure notifications after which map task is failed
- private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+ private float maxAllowedFetchFailuresFraction;
+
+ //Maximum no. of fetch-failure notifications after which map task is failed
+ private int maxFetchFailuresNotifications;
public static final String JOB_KILLED_DIAG =
"Job received Kill while in RUNNING state.";
@@ -250,9 +253,12 @@ public class JobImpl implements org.apac
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
(JobStateInternal.NEW,
- EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+ EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
JobEventType.JOB_INIT,
new InitTransition())
+ .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
+ JobEventType.JOB_INIT_FAILED,
+ new InitFailedTransition())
.addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
JobEventType.JOB_KILL,
new KillNewJobTransition())
@@ -265,7 +271,7 @@ public class JobImpl implements org.apac
// Ignore-able events
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_UPDATED_NODES)
-
+
// Transitions from INITED state
.addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -641,8 +647,8 @@ public class JobImpl implements org.apac
private JobStateInternal forcedState = null;
- //Executor used for running future tasks. Setting thread pool size to 1
- private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ //Executor used for running future tasks.
+ private ScheduledThreadPoolExecutor executor;
private ScheduledFuture failWaitTriggerScheduledFuture;
private JobState lastNonFinalState = JobState.NEW;
@@ -684,6 +690,13 @@ public class JobImpl implements org.apac
this.aclsManager = new JobACLsManager(conf);
this.username = System.getProperty("user.name");
this.jobACLs = aclsManager.constructJobACLs(conf);
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("Job Fail Wait Timeout Monitor #%d")
+ .setDaemon(true)
+ .build();
+ this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
@@ -691,6 +704,13 @@ public class JobImpl implements org.apac
if(forcedDiagnostic != null) {
this.diagnostics.add(forcedDiagnostic);
}
+
+ this.maxAllowedFetchFailuresFraction = conf.getFloat(
+ MRJobConfig.MAX_ALLOWED_FETCH_FAILURES_FRACTION,
+ MRJobConfig.DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION);
+ this.maxFetchFailuresNotifications = conf.getInt(
+ MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
+ MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
}
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@@ -717,7 +737,7 @@ public class JobImpl implements org.apac
if (jobACL == null) {
return true;
}
- return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
+ return aclsManager.checkAccess(callerUGI, jobOperation, userName, jobACL);
}
@Override
@@ -1205,22 +1225,25 @@ public class JobImpl implements org.apac
boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
boolean smallInput = (dataInputLength <= sysMaxBytes);
// ignoring overhead due to UberAM and statics as negligible here:
+ long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);
+ long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);
+ long requiredMB = Math.max(requiredMapMB, requiredReduceMB);
+ int requiredMapCores = conf.getInt(
+ MRJobConfig.MAP_CPU_VCORES,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ int requiredReduceCores = conf.getInt(
+ MRJobConfig.REDUCE_CPU_VCORES,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ int requiredCores = Math.max(requiredMapCores, requiredReduceCores);
+ if (numReduceTasks == 0) {
+ requiredMB = requiredMapMB;
+ requiredCores = requiredMapCores;
+ }
boolean smallMemory =
- ( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0),
- conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
- <= sysMemSizeForUberSlot)
- || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
- boolean smallCpu =
- (
- Math.max(
- conf.getInt(
- MRJobConfig.MAP_CPU_VCORES,
- MRJobConfig.DEFAULT_MAP_CPU_VCORES),
- conf.getInt(
- MRJobConfig.REDUCE_CPU_VCORES,
- MRJobConfig.DEFAULT_REDUCE_CPU_VCORES))
- <= sysCPUSizeForUberSlot
- );
+ (requiredMB <= sysMemSizeForUberSlot)
+ || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
+
+ boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
boolean notChainJob = !isChainJob(conf);
// User has overall veto power over uberization, or user can modify
@@ -1285,6 +1308,7 @@ public class JobImpl implements org.apac
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainMapper
+ } catch (NoClassDefFoundError ignored) {
}
try {
String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
@@ -1295,6 +1319,7 @@ public class JobImpl implements org.apac
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainReducer
+ } catch (NoClassDefFoundError ignored) {
}
return isChainJob;
}
@@ -1374,6 +1399,15 @@ public class JobImpl implements org.apac
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
+
+ if (job.newApiCommitter) {
+ job.jobContext = new JobContextImpl(job.conf,
+ job.oldJobId);
+ } else {
+ job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
+ job.conf, job.oldJobId);
+ }
+
try {
setup(job);
job.fs = job.getFileSystem(job.conf);
@@ -1409,14 +1443,6 @@ public class JobImpl implements org.apac
checkTaskLimits();
- if (job.newApiCommitter) {
- job.jobContext = new JobContextImpl(job.conf,
- job.oldJobId);
- } else {
- job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
- job.conf, job.oldJobId);
- }
-
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
@@ -1443,15 +1469,14 @@ public class JobImpl implements org.apac
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
- job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
- job.jobContext,
- org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
- return JobStateInternal.FAILED;
+ // Leave job in the NEW state. The MR AM will detect that the state is
+ // not INITED and send a JOB_INIT_FAILED event.
+ return JobStateInternal.NEW;
}
}
@@ -1552,6 +1577,16 @@ public class JobImpl implements org.apac
}
} // end of InitTransition
+ private static class InitFailedTransition
+ implements SingleArcTransition<JobImpl, JobEvent> {
+ @Override
+ public void transition(JobImpl job, JobEvent event) {
+ job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+ job.jobContext,
+ org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+ }
+ }
+
private static class SetupCompletedTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
@@ -1872,9 +1907,8 @@ public class JobImpl implements org.apac
float failureRate = shufflingReduceTasks == 0 ? 1.0f :
(float) fetchFailures / shufflingReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
- boolean isMapFaulty =
- (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
- if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
+ if (fetchFailures >= job.getMaxFetchFailuresNotifications()
+ && failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
LOG.info("Too many fetch-failures for output of task attempt: " +
mapId + " ... raising fetch failure to map");
job.eventHandler.handle(new TaskAttemptEvent(mapId,
@@ -2157,4 +2191,12 @@ public class JobImpl implements org.apac
jobConf.addResource(fc.open(confPath), confPath.toString());
return jobConf;
}
+
+ public float getMaxAllowedFetchFailuresFraction() {
+ return maxAllowedFetchFailuresFraction;
+ }
+
+ public int getMaxFetchFailuresNotifications() {
+ return maxFetchFailuresNotifications;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Aug 19 23:49:39 2014
@@ -335,6 +335,15 @@ public abstract class TaskAttemptImpl im
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
+ // AM is likely to receive duplicate TA_COMMIT_PENDINGs as the task attempt
+ // will re-send the commit message until it doesn't encounter any
+ // IOException and succeeds in delivering the commit message.
+ // Ignoring the duplicate commit message is a short-term fix. In long term,
+ // we need to make use of retry cache to help this and other MR protocol
+ // APIs that can be considered as @AtMostOnce.
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptEventType.TA_COMMIT_PENDING)
// Transitions from SUCCESS_CONTAINER_CLEANUP state
// kill and cleanup the container
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Aug 19 23:49:39 2014
@@ -249,8 +249,16 @@ public abstract class TaskImpl implement
TaskEventType.T_ATTEMPT_SUCCEEDED))
// Transitions from KILLED state
+ // There could be a race condition where TaskImpl might receive
+ // T_ATTEMPT_SUCCEEDED followed by T_ATTEMPTED_KILLED for the same attempt.
+ // a. The task is in KILL_WAIT.
+ // b. Before TA transitions to SUCCEEDED state, Task sends TA_KILL event.
+ // c. TA transitions to SUCCEEDED state and thus send T_ATTEMPT_SUCCEEDED
+ // to the task. The task transitions to KILLED state.
+ // d. TA processes TA_KILL event and sends T_ATTEMPT_KILLED to the task.
.addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
EnumSet.of(TaskEventType.T_KILL,
+ TaskEventType.T_ATTEMPT_KILLED,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// create the topology tables
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Tue Aug 19 23:49:39 2014
@@ -64,6 +64,7 @@ public class LocalContainerAllocator ext
private int nmPort;
private int nmHttpPort;
private ContainerId containerId;
+ protected int lastResponseID;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -119,6 +120,11 @@ public class LocalContainerAllocator ext
if (allocateResponse.getAMCommand() != null) {
switch(allocateResponse.getAMCommand()) {
case AM_RESYNC:
+ LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ this.lastResponseID = 0;
+ register();
+ break;
case AM_SHUTDOWN:
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Aug 19 23:49:39 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -185,7 +186,7 @@ public abstract class RMCommunicator ext
// if unregistration failed, isLastAMRetry needs to be recalculated
// to see whether AM really has the chance to retry
RunningAppContext raContext = (RunningAppContext) context;
- raContext.computeIsLastAMRetry();
+ raContext.resetIsLastAMRetry();
}
}
@@ -216,20 +217,27 @@ public abstract class RMCommunicator ext
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(finishState,
sb.toString(), historyUrl);
- while (true) {
- FinishApplicationMasterResponse response =
- scheduler.finishApplicationMaster(request);
- if (response.getIsUnregistered()) {
- // When excepting ClientService, other services are already stopped,
- // it is safe to let clients know the final states. ClientService
- // should wait for some time so clients have enough time to know the
- // final states.
- RunningAppContext raContext = (RunningAppContext) context;
- raContext.markSuccessfulUnregistration();
- break;
+ try {
+ while (true) {
+ FinishApplicationMasterResponse response =
+ scheduler.finishApplicationMaster(request);
+ if (response.getIsUnregistered()) {
+ // When excepting ClientService, other services are already stopped,
+ // it is safe to let clients know the final states. ClientService
+ // should wait for some time so clients have enough time to know the
+ // final states.
+ RunningAppContext raContext = (RunningAppContext) context;
+ raContext.markSuccessfulUnregistration();
+ break;
+ }
+ LOG.info("Waiting for application to be successfully unregistered.");
+ Thread.sleep(rmPollInterval);
}
- LOG.info("Waiting for application to be successfully unregistered.");
- Thread.sleep(rmPollInterval);
+ } catch (ApplicationMasterNotRegisteredException e) {
+ // RM might have restarted or failed over and so lost the fact that AM had
+ // registered before.
+ register();
+ doUnregistration();
}
}