You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [12/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -64,6 +66,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -115,6 +118,7 @@ public class RMAppImpl implements RMApp,
private EventHandler handler;
private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition();
+ private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
@@ -162,6 +166,8 @@ public class RMAppImpl implements RMApp,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+ RMAppEventType.MOVE, new RMAppMoveTransition())
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
@@ -179,7 +185,6 @@ public class RMAppImpl implements RMApp,
new FinalSavingTransition(
new AppKilledTransition(), RMAppState.KILLED))
-
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
@@ -199,12 +204,9 @@ public class RMAppImpl implements RMApp,
new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
RMAppEventType.KILL, new KillAttemptTransition())
- // ACCECPTED state can once again receive APP_ACCEPTED event, because on
- // recovery the app returns ACCEPTED state and the app once again go
- // through the scheduler and triggers one more APP_ACCEPTED event at
- // ACCEPTED state.
- .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.APP_ACCEPTED)
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -219,6 +221,9 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
// UnManagedAM directly jumps to finished
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.RUNNING,
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
@@ -234,22 +239,31 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_FINISHED,
new AttemptFinishedAtFinalSavingTransition())
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
- RMAppEventType.APP_NEW_SAVED))
+ RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
- // ignore Kill as we have already saved the final Finished state in
- // state store.
- RMAppEventType.KILL))
+ // ignore Kill/Move as we have already saved the final Finished state
+ // in state store.
+ RMAppEventType.KILL, RMAppEventType.MOVE))
// Transitions from KILLING state
+ .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
@@ -262,42 +276,47 @@ public class RMAppImpl implements RMApp,
RMAppEventType.ATTEMPT_FINISHED,
RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.APP_UPDATE_SAVED,
- RMAppEventType.KILL))
+ RMAppEventType.KILL, RMAppEventType.MOVE))
// Transitions from FINISHED state
// ignorable transitions
+ .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
EnumSet.of(
RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_UNREGISTERED,
RMAppEventType.ATTEMPT_FINISHED,
- RMAppEventType.KILL))
+ RMAppEventType.KILL, RMAppEventType.MOVE))
// Transitions from FAILED state
// ignorable transitions
+ .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
- EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
+ EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
+ RMAppEventType.MOVE))
// Transitions from KILLED state
// ignorable transitions
+ .addTransition(RMAppState.KILLED, RMAppState.KILLED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(
RMAppState.KILLED,
RMAppState.KILLED,
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
- RMAppEventType.NODE_UPDATE))
+ RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
.installTopology();
private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
stateMachine;
- private static final ApplicationResourceUsageReport
- DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
- BuilderUtils.newApplicationResourceUsageReport(-1, -1,
- Resources.createResource(-1, -1), Resources.createResource(-1, -1),
- Resources.createResource(-1, -1));
private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
@@ -498,7 +517,7 @@ public class RMAppImpl implements RMApp,
String origTrackingUrl = UNAVAILABLE;
int rpcPort = -1;
ApplicationResourceUsageReport appUsageReport =
- DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
+ RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
FinalApplicationStatus finishState = getFinalApplicationStatus();
String diags = UNAVAILABLE;
float progress = 0.0f;
@@ -666,7 +685,12 @@ public class RMAppImpl implements RMApp,
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
- submissionContext, conf, maxAppAttempts == attempts.size());
+ submissionContext, conf,
+ // The newly created attempt maybe last attempt if (number of
+ // previously failed attempts(which should not include Preempted,
+ // hardware error and NM resync) + 1) equal to the max-attempt
+ // limit.
+ maxAppAttempts == (getNumFailedAppAttempts() + 1));
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
@@ -699,6 +723,23 @@ public class RMAppImpl implements RMApp,
nodeUpdateEvent.getNode());
};
}
+
+ private static final class AppRunningOnNodeTransition extends RMAppTransition {
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
+
+ // if final state already stored, notify RMNode
+ if (isAppInFinalState(app)) {
+ app.handler.handle(
+ new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
+ .getApplicationId()));
+ return;
+ }
+
+ // otherwise, add it to ranNodes for further process
+ app.ranNodes.add(nodeAddedEvent.getNodeId());
+ };
+ }
/**
* Move an app to a new queue.
@@ -723,46 +764,54 @@ public class RMAppImpl implements RMApp,
}
}
+ // synchronously recover attempt to ensure any incoming external events
+ // to be processed after the attempt processes the recover event.
+ private void recoverAppAttempts() {
+ for (RMAppAttempt attempt : getAppAttempts().values()) {
+ attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
+ }
+ }
+
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
- for (RMAppAttempt attempt : app.getAppAttempts().values()) {
- // synchronously recover attempt to ensure any incoming external events
- // to be processed after the attempt processes the recover event.
- attempt.handle(
- new RMAppAttemptEvent(attempt.getAppAttemptId(),
- RMAppAttemptEventType.RECOVER));
- }
-
// The app has completed.
if (app.recoveredFinalState != null) {
+ app.recoverAppAttempts();
new FinalTransition(app.recoveredFinalState).transition(app, event);
return app.recoveredFinalState;
}
- // Last attempt is in final state, do not add to scheduler and just return
- // ACCEPTED waiting for last RMAppAttempt to send finished or failed event
- // back.
+ // No existent attempts means the attempt associated with this app was not
+ // started or started but not yet saved.
+ if (app.attempts.isEmpty()) {
+ app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+ app.submissionContext.getQueue(), app.user));
+ return RMAppState.SUBMITTED;
+ }
+
+ // Add application to scheduler synchronously to guarantee scheduler
+ // knows applications before AM or NM re-registers.
+ app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+ app.submissionContext.getQueue(), app.user, true));
+
+ // recover attempts
+ app.recoverAppAttempts();
+
+ // Last attempt is in final state, return ACCEPTED waiting for last
+ // RMAppAttempt to send finished or failed event back.
if (app.currentAttempt != null
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
|| (app.currentAttempt.getState() == RMAppAttemptState.FAILED
- && app.attempts.size() == app.maxAppAttempts))) {
+ && app.getNumFailedAppAttempts() == app.maxAppAttempts))) {
return RMAppState.ACCEPTED;
}
- // Notify scheduler about the app on recovery
- new AddApplicationToSchedulerTransition().transition(app, event);
-
- // No existent attempts means the attempt associated with this app was not
- // started or started but not yet saved.
- if (app.attempts.isEmpty()) {
- return RMAppState.SUBMITTED;
- }
-
// YARN-1507 is saving the application state after the application is
// accepted. So after YARN-1507, an app is saved meaning it is accepted.
// Thus we return ACCECPTED state on recovery.
@@ -774,17 +823,6 @@ public class RMAppImpl implements RMApp,
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- if (event instanceof RMAppNewSavedEvent) {
- RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
- // For HA this exception needs to be handled by giving up
- // master status if we got fenced
- if (((RMAppNewSavedEvent) event).getStoredException() != null) {
- LOG.error(
- "Failed to store application: " + storeEvent.getApplicationId(),
- storeEvent.getStoredException());
- ExitUtil.terminate(1, storeEvent.getStoredException());
- }
- }
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user));
}
@@ -802,13 +840,6 @@ public class RMAppImpl implements RMApp,
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
- RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
- if (storeEvent.getUpdatedException() != null) {
- LOG.error("Failed to update the final state of application"
- + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
- ExitUtil.terminate(1, storeEvent.getUpdatedException());
- }
-
if (app.transitionTodo instanceof SingleArcTransition) {
((SingleArcTransition) app.transitionTodo).transition(app,
app.eventCausingFinalSaving);
@@ -844,7 +875,7 @@ public class RMAppImpl implements RMApp,
msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
- } else if (this.attempts.size() >= this.maxAppAttempts) {
+ } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
msg = "Application " + this.getApplicationId() + " failed "
+ this.maxAppAttempts + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application.";
@@ -1037,17 +1068,8 @@ public class RMAppImpl implements RMApp,
this.finalState = finalState;
}
- private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
- Set<NodeId> nodes = new HashSet<NodeId>();
- for (RMAppAttempt attempt : app.attempts.values()) {
- nodes.addAll(attempt.getRanNodes());
- }
- return nodes;
- }
-
public void transition(RMAppImpl app, RMAppEvent event) {
- Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
- for (NodeId nodeId : nodes) {
+ for (NodeId nodeId : app.getRanNodes()) {
app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
@@ -1055,8 +1077,12 @@ public class RMAppImpl implements RMApp,
if (app.finishTime == 0 ) {
app.finishTime = System.currentTimeMillis();
}
- app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
- finalState));
+ // Recovered apps that are completed were not added to scheduler, so no
+ // need to remove them from scheduler.
+ if (app.recoveredFinalState == null) {
+ app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
+ finalState));
+ }
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
@@ -1066,6 +1092,18 @@ public class RMAppImpl implements RMApp,
};
}
+ private int getNumFailedAppAttempts() {
+ int completedAttempts = 0;
+ // Do not count AM preemption, hardware failures or NM resync
+ // as attempt failure.
+ for (RMAppAttempt attempt : attempts.values()) {
+ if (attempt.shouldCountTowardsMaxAttemptRetry()) {
+ completedAttempts++;
+ }
+ }
+ return completedAttempts;
+ }
+
private static final class AttemptFailedTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@@ -1077,8 +1115,9 @@ public class RMAppImpl implements RMApp,
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+
if (!app.submissionContext.getUnmanagedAM()
- && app.attempts.size() < app.maxAppAttempts) {
+ && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt =
@@ -1137,6 +1176,9 @@ public class RMAppImpl implements RMApp,
public static boolean isAppInFinalState(RMApp rmApp) {
RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
+ if (appState == null) {
+ appState = rmApp.getState();
+ }
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED;
}
@@ -1144,4 +1186,30 @@ public class RMAppImpl implements RMApp,
private RMAppState getRecoveredFinalState() {
return this.recoveredFinalState;
}
+
+ @Override
+ public Set<NodeId> getRanNodes() {
+ return ranNodes;
+ }
+
+ @Override
+ public RMAppMetrics getRMAppMetrics() {
+ Resource resourcePreempted = Resource.newInstance(0, 0);
+ int numAMContainerPreempted = 0;
+ int numNonAMContainerPreempted = 0;
+ for (RMAppAttempt attempt : attempts.values()) {
+ if (null != attempt) {
+ RMAppAttemptMetrics attemptMetrics =
+ attempt.getRMAppAttemptMetrics();
+ Resources.addTo(resourcePreempted,
+ attemptMetrics.getResourcePreempted());
+ numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0;
+ numNonAMContainerPreempted +=
+ attemptMetrics.getNumNonAMContainersPreempted();
+ }
+ }
+
+ return new RMAppMetrics(resourcePreempted,
+ numNonAMContainerPreempted, numAMContainerPreempted);
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Tue Aug 19 23:49:39 2014
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.List;
-import java.util.Set;
import javax.crypto.SecretKey;
@@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -115,12 +113,6 @@ public interface RMAppAttempt extends Ev
FinalApplicationStatus getFinalApplicationStatus();
/**
- * Nodes on which the containers for this {@link RMAppAttempt} ran.
- * @return the set of nodes that ran any containers from this {@link RMAppAttempt}
- */
- Set<NodeId> getRanNodes();
-
- /**
* Return a list of the last set of finished containers, resetting the
* finished containers to empty.
* @return the list of just finished containers, re setting the finished containers.
@@ -204,4 +196,21 @@ public interface RMAppAttempt extends Ev
*/
ApplicationAttemptReport createApplicationAttemptReport();
+ /**
+ * Return the flag which indicates whether the attempt failure should be
+ * counted to attempt retry count.
+ * <ul>
+ * There failure types should not be counted to attempt retry count:
+ * <li>preempted by the scheduler.</li>
+ * <li>hardware failures, such as NM failing, lost NM and NM disk errors.</li>
+ * <li>killed by RM because of RM restart or failover.</li>
+ * </ul>
+ */
+ boolean shouldCountTowardsMaxAttemptRetry();
+
+ /**
+ * Get metrics from the {@link RMAppAttempt}
+ * @return metrics
+ */
+ RMAppAttemptMetrics getRMAppAttemptMetrics();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Tue Aug 19 23:49:39 2014
@@ -36,7 +36,6 @@ public enum RMAppAttemptEventType {
UNREGISTERED,
// Source: Containers
- CONTAINER_ACQUIRED,
CONTAINER_ALLOCATED,
CONTAINER_FINISHED,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Aug 19 23:49:39 2014
@@ -26,20 +26,18 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -51,12 +49,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -80,15 +77,13 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -103,6 +98,8 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import com.google.common.annotations.VisibleForTesting;
+
@SuppressWarnings({"unchecked", "rawtypes"})
public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@@ -133,10 +130,7 @@ public class RMAppAttemptImpl implements
private final ApplicationSubmissionContext submissionContext;
private Token<AMRMTokenIdentifier> amrmToken = null;
private SecretKey clientTokenMasterKey = null;
-
- //nodes on while this attempt's containers ran
- private Set<NodeId> ranNodes =
- new HashSet<NodeId>();
+
private List<ContainerStatus> justFinishedContainers =
new ArrayList<ContainerStatus>();
private Container masterContainer;
@@ -152,9 +146,15 @@ public class RMAppAttemptImpl implements
// if an RMAppAttemptUnregistrationEvent occurs
private FinalApplicationStatus finalStatus = null;
private final StringBuilder diagnostics = new StringBuilder();
+ private int amContainerExitStatus = ContainerExitStatus.INVALID;
private Configuration conf;
- private final boolean isLastAttempt;
+ // Since AM preemption, hardware error and NM resync are not counted towards
+ // AM failure count, even if this flag is true, a new attempt can still be
+ // re-created if this attempt is eventually failed because of preemption,
+ // hardware error or NM resync. So this flag indicates that this may be
+ // last attempt.
+ private final boolean maybeLastAttempt;
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
@@ -163,6 +163,8 @@ public class RMAppAttemptImpl implements
private RMAppAttemptState recoveredFinalState;
private RMAppAttemptState stateBeforeFinalSaving;
private Object transitionTodo;
+
+ private RMAppAttemptMetrics attemptMetrics = null;
private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
@@ -214,21 +216,30 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new BaseFinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
-
+ .addTransition(RMAppAttemptState.SCHEDULED,
+ RMAppAttemptState.FINAL_SAVING,
+ RMAppAttemptEventType.CONTAINER_FINISHED,
+ new FinalSavingTransition(
+ new AMContainerCrashedBeforeRunningTransition(),
+ RMAppAttemptState.FAILED))
+
// Transitions from ALLOCATED_SAVING State
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
- .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
- RMAppAttemptState.ALLOCATED_SAVING,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- new ContainerAcquiredTransition())
+
// App could be killed by the client. So need to handle this.
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new BaseFinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
+ RMAppAttemptState.FINAL_SAVING,
+ RMAppAttemptEventType.CONTAINER_FINISHED,
+ new FinalSavingTransition(
+ new AMContainerCrashedBeforeRunningTransition(),
+ RMAppAttemptState.FAILED))
// Transitions from LAUNCHED_UNMANAGED_SAVING State
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
@@ -249,10 +260,6 @@ public class RMAppAttemptImpl implements
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
// Transitions from ALLOCATED State
- .addTransition(RMAppAttemptState.ALLOCATED,
- RMAppAttemptState.ALLOCATED,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- new ContainerAcquiredTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
@@ -267,15 +274,17 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
new FinalSavingTransition(
- new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+ new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED))
// Transitions from LAUNCHED State
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
- .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
+ .addTransition(RMAppAttemptState.LAUNCHED,
+ EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
- new FinalSavingTransition(
- new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+ new ContainerFinishedTransition(
+ new AMContainerCrashedBeforeRunningTransition(),
+ RMAppAttemptState.LAUNCHED))
.addTransition(
RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.EXPIRE,
@@ -295,14 +304,12 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.CONTAINER_ALLOCATED)
.addTransition(
- RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- new ContainerAcquiredTransition())
- .addTransition(
RMAppAttemptState.RUNNING,
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
- new ContainerFinishedTransition())
+ new ContainerFinishedTransition(
+ new AMContainerCrashedAtRunningTransition(),
+ RMAppAttemptState.RUNNING))
.addTransition(
RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.EXPIRE,
@@ -333,7 +340,6 @@ public class RMAppAttemptImpl implements
// should be fixed to reject container allocate request at Final
// Saving in scheduler
RMAppAttemptEventType.CONTAINER_ALLOCATED,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
RMAppAttemptEventType.KILL))
@@ -343,7 +349,7 @@ public class RMAppAttemptImpl implements
// use by the next new attempt.
.addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
RMAppAttemptEventType.CONTAINER_FINISHED,
- new ContainerFinishedAtFailedTransition())
+ new ContainerFinishedAtFinalStateTransition())
.addTransition(
RMAppAttemptState.FAILED,
RMAppAttemptState.FAILED,
@@ -379,31 +385,36 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
- RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.KILL))
+ .addTransition(RMAppAttemptState.FINISHED,
+ RMAppAttemptState.FINISHED,
+ RMAppAttemptEventType.CONTAINER_FINISHED,
+ new ContainerFinishedAtFinalStateTransition())
// Transitions from KILLED State
.addTransition(
RMAppAttemptState.KILLED,
RMAppAttemptState.KILLED,
EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
- RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.LAUNCH_FAILED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
- RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
+ .addTransition(RMAppAttemptState.KILLED,
+ RMAppAttemptState.KILLED,
+ RMAppAttemptEventType.CONTAINER_FINISHED,
+ new ContainerFinishedAtFinalStateTransition())
.installTopology();
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext,
- Configuration conf, boolean isLastAttempt) {
+ Configuration conf, boolean maybeLastAttempt) {
this.conf = conf;
this.applicationAttemptId = appAttemptId;
this.rmContext = rmContext;
@@ -417,8 +428,9 @@ public class RMAppAttemptImpl implements
this.writeLock = lock.writeLock();
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
- this.isLastAttempt = isLastAttempt;
+ this.maybeLastAttempt = maybeLastAttempt;
this.stateMachine = stateMachineFactory.make(this);
+ this.attemptMetrics = new RMAppAttemptMetrics(applicationAttemptId);
}
@Override
@@ -526,7 +538,7 @@ public class RMAppAttemptImpl implements
private void setTrackingUrlToRMAppPage() {
originalTrackingUrl = pjoin(
- WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
+ WebAppUtils.getResolvedRMWebAppURLWithScheme(conf),
"cluster", "app", getAppAttemptId().getApplicationId());
proxiedTrackingUrl = originalTrackingUrl;
}
@@ -545,7 +557,22 @@ public class RMAppAttemptImpl implements
@Override
public Token<AMRMTokenIdentifier> getAMRMToken() {
- return this.amrmToken;
+ this.readLock.lock();
+ try {
+ return this.amrmToken;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Private
+ public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) {
+ this.writeLock.lock();
+ try {
+ this.amrmToken = lastToken;
+ } finally {
+ this.writeLock.unlock();
+ }
}
@Override
@@ -579,6 +606,15 @@ public class RMAppAttemptImpl implements
}
}
+ public int getAMContainerExitStatus() {
+ this.readLock.lock();
+ try {
+ return this.amContainerExitStatus;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
@Override
public float getProgress() {
this.readLock.lock();
@@ -616,11 +652,6 @@ public class RMAppAttemptImpl implements
}
@Override
- public Set<NodeId> getRanNodes() {
- return ranNodes;
- }
-
- @Override
public Container getMasterContainer() {
this.readLock.lock();
@@ -671,9 +702,7 @@ public class RMAppAttemptImpl implements
ApplicationResourceUsageReport report =
scheduler.getAppResourceUsageReport(this.getAppAttemptId());
if (report == null) {
- Resource none = Resource.newInstance(0, 0);
- report = ApplicationResourceUsageReport.newInstance(0, 0, none, none,
- none);
+ report = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
}
return report;
} finally {
@@ -692,8 +721,13 @@ public class RMAppAttemptImpl implements
+ attemptState.getState());
diagnostics.append("Attempt recovered after RM restart");
diagnostics.append(attemptState.getDiagnostics());
+ this.amContainerExitStatus = attemptState.getAMContainerExitStatus();
+ if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) {
+ this.attemptMetrics.setIsPreempted();
+ }
setMasterContainer(attemptState.getMasterContainer());
- recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
+ recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
+ attemptState.getState());
this.recoveredFinalState = attemptState.getState();
this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -703,12 +737,13 @@ public class RMAppAttemptImpl implements
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainers();
- this.ranNodes = attempt.getRanNodes();
}
- private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
- throws IOException {
- if (appAttemptTokens == null) {
+ private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
+ RMAppAttemptState state) throws IOException {
+ if (appAttemptTokens == null || state == RMAppAttemptState.FAILED
+ || state == RMAppAttemptState.FINISHED
+ || state == RMAppAttemptState.KILLED) {
return;
}
@@ -719,12 +754,9 @@ public class RMAppAttemptImpl implements
.registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
}
- // Only one AMRMToken is stored per-attempt, so this should be fine. Can't
- // use TokenSelector as service may change - think fail-over.
this.amrmToken =
- (Token<AMRMTokenIdentifier>) appAttemptTokens
- .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
- rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
+ rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ applicationAttemptId);
}
private static class BaseTransition implements
@@ -760,13 +792,6 @@ public class RMAppAttemptImpl implements
.createMasterKey(appAttempt.applicationAttemptId);
}
- // create AMRMToken
- AMRMTokenIdentifier id =
- new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
- appAttempt.amrmToken =
- new Token<AMRMTokenIdentifier>(id,
- appAttempt.rmContext.getAMRMTokenSecretManager());
-
// Add the applicationAttempt to the scheduler and inform the scheduler
// whether to transfer the state from previous attempt.
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
@@ -838,7 +863,10 @@ public class RMAppAttemptImpl implements
// Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers()
- .get(0));
+ .get(0));
+ RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
+ .getRMContainer(appAttempt.getMasterContainer().getId());
+ rmMasterContainer.setAMContainer(true);
// The node set in NMTokenSecrentManager is used for marking whether the
// NMToken has been issued for this node to the AM.
// When AM container was allocated to RM itself, the node which allocates
@@ -875,7 +903,6 @@ public class RMAppAttemptImpl implements
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- appAttempt.checkAttemptStoreError(event);
appAttempt.launchAttempt();
}
}
@@ -904,6 +931,18 @@ public class RMAppAttemptImpl implements
}
return appAttempt.recoveredFinalState;
} else {
+ // Add the current attempt to the scheduler.
+ if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
+ // Need to register an app attempt before AM can register
+ appAttempt.masterService
+ .registerAppAttempt(appAttempt.applicationAttemptId);
+
+ // Add attempt to scheduler synchronously to guarantee scheduler
+ // knows attempts before AM or NM re-registers.
+ appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
+ appAttempt.getAppAttemptId(), false, true));
+ }
+
/*
* Since the application attempt's final state is not saved that means
* for AM container (previous attempt) state must be one of these.
@@ -947,7 +986,7 @@ public class RMAppAttemptImpl implements
String diags = null;
String finalTrackingUrl = null;
FinalApplicationStatus finalStatus = null;
-
+ int exitStatus = ContainerExitStatus.INVALID;
switch (event.getType()) {
case LAUNCH_FAILED:
RMAppAttemptLaunchFailedEvent launchFaileEvent =
@@ -968,6 +1007,7 @@ public class RMAppAttemptImpl implements
RMAppAttemptContainerFinishedEvent finishEvent =
(RMAppAttemptContainerFinishedEvent) event;
diags = getAMContainerCrashedDiagnostics(finishEvent);
+ exitStatus = finishEvent.getContainerStatus().getExitStatus();
break;
case KILL:
break;
@@ -982,9 +1022,10 @@ public class RMAppAttemptImpl implements
ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime,
- stateToBeStored, finalTrackingUrl, diags, finalStatus);
+ stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
LOG.info("Updating application attempt " + applicationAttemptId
- + " with final state: " + targetedFinalState);
+ + " with final state: " + targetedFinalState + ", and exit status: "
+ + exitStatus);
rmStore.updateApplicationAttemptState(attemptState);
}
@@ -1013,14 +1054,6 @@ public class RMAppAttemptImpl implements
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
- if (storeEvent.getUpdatedException() != null) {
- LOG.error("Failed to update the final state of application attempt: "
- + storeEvent.getApplicationAttemptId(),
- storeEvent.getUpdatedException());
- ExitUtil.terminate(1, storeEvent.getUpdatedException());
- }
-
RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
if (appAttempt.transitionTodo instanceof SingleArcTransition) {
@@ -1077,11 +1110,20 @@ public class RMAppAttemptImpl implements
// don't leave the tracking URL pointing to a non-existent AM
appAttempt.setTrackingUrlToRMAppPage();
appAttempt.invalidateAMHostAndPort();
+
if (appAttempt.submissionContext
.getKeepContainersAcrossApplicationAttempts()
- && !appAttempt.isLastAttempt
&& !appAttempt.submissionContext.getUnmanagedAM()) {
- keepContainersAcrossAppAttempts = true;
+ // See if we should retain containers for non-unmanaged applications
+ if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) {
+ // Premption, hardware failures, NM resync doesn't count towards
+ // app-failures and so we should retain containers.
+ keepContainersAcrossAppAttempts = true;
+ } else if (!appAttempt.maybeLastAttempt) {
+ // Not preemption, hardware failures or NM resync.
+ // Not last-attempt too - keep containers.
+ keepContainersAcrossAppAttempts = true;
+ }
}
appEvent =
new RMAppFailedAttemptEvent(applicationId,
@@ -1121,18 +1163,31 @@ public class RMAppAttemptImpl implements
appAttempt.getClientTokenMasterKey());
}
}
-
+
+ @Override
+ public boolean shouldCountTowardsMaxAttemptRetry() {
+ try {
+ this.readLock.lock();
+ int exitStatus = getAMContainerExitStatus();
+ return !(exitStatus == ContainerExitStatus.PREEMPTED
+ || exitStatus == ContainerExitStatus.ABORTED
+ || exitStatus == ContainerExitStatus.DISKS_FAILED
+ || exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
private static final class UnmanagedAMAttemptSavedTransition
extends AMLaunchedTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- appAttempt.checkAttemptStoreError(event);
- // TODO Today unmanaged AM client is waiting for app state to be Accepted to
- // launch the AM. This is broken since we changed to start the attempt
- // after the application is Accepted. We may need to introduce an attempt
- // report that client can rely on to query the attempt state and choose to
- // launch the unmanaged AM.
+ // create AMRMToken
+ appAttempt.amrmToken =
+ appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ appAttempt.applicationAttemptId);
+
super.transition(appAttempt, event);
}
}
@@ -1207,17 +1262,16 @@ public class RMAppAttemptImpl implements
}
}
- private static final class AMContainerCrashedTransition extends
+ private static final class AMContainerCrashedBeforeRunningTransition extends
BaseFinalTransition {
- public AMContainerCrashedTransition() {
+ public AMContainerCrashedBeforeRunningTransition() {
super(RMAppAttemptState.FAILED);
}
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
-
RMAppAttemptContainerFinishedEvent finishEvent =
((RMAppAttemptContainerFinishedEvent)event);
@@ -1225,23 +1279,39 @@ public class RMAppAttemptImpl implements
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
appAttempt.getAppAttemptId());
- // Setup diagnostic message
- appAttempt.diagnostics
- .append(getAMContainerCrashedDiagnostics(finishEvent));
+ // Setup diagnostic message and exit status
+ appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
+
// Tell the app, scheduler
super.transition(appAttempt, finishEvent);
}
}
- private static String getAMContainerCrashedDiagnostics(
+ private void setAMContainerCrashedDiagnosticsAndExitStatus(
+ RMAppAttemptContainerFinishedEvent finishEvent) {
+ ContainerStatus status = finishEvent.getContainerStatus();
+ String diagnostics = getAMContainerCrashedDiagnostics(finishEvent);
+ this.diagnostics.append(diagnostics);
+ this.amContainerExitStatus = status.getExitStatus();
+ }
+
+ private String getAMContainerCrashedDiagnostics(
RMAppAttemptContainerFinishedEvent finishEvent) {
ContainerStatus status = finishEvent.getContainerStatus();
- String diagnostics =
- "AM Container for " + finishEvent.getApplicationAttemptId()
- + " exited with " + " exitCode: " + status.getExitStatus()
- + " due to: " + status.getDiagnostics() + "."
- + "Failing this attempt.";
- return diagnostics;
+ StringBuilder diagnosticsBuilder = new StringBuilder();
+ diagnosticsBuilder.append("AM Container for ").append(
+ finishEvent.getApplicationAttemptId()).append(
+ " exited with ").append(" exitCode: ").append(status.getExitStatus()).
+ append("\n");
+ if (this.getTrackingUrl() != null) {
+ diagnosticsBuilder.append("For more detailed output,").append(
+ " check application tracking page:").append(
+ this.getTrackingUrl()).append(
+ "Then, click on links to logs of each attempt.\n");
+ }
+ diagnosticsBuilder.append("Diagnostics: ").append(status.getDiagnostics())
+ .append("Failing this attempt");
+ return diagnosticsBuilder.toString();
}
private static class FinalTransition extends BaseFinalTransition {
@@ -1395,27 +1465,26 @@ public class RMAppAttemptImpl implements
finalStatus = unregisterEvent.getFinalApplicationStatus();
}
- private static final class ContainerAcquiredTransition extends
- BaseTransition {
- @Override
- public void transition(RMAppAttemptImpl appAttempt,
- RMAppAttemptEvent event) {
- RMAppAttemptContainerAcquiredEvent acquiredEvent
- = (RMAppAttemptContainerAcquiredEvent) event;
- appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId());
- }
- }
-
private static final class ContainerFinishedTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+ // The transition To Do after attempt final state is saved.
+ private BaseTransition transitionToDo;
+ private RMAppAttemptState currentState;
+
+ public ContainerFinishedTransition(BaseTransition transitionToDo,
+ RMAppAttemptState currentState) {
+ this.transitionToDo = transitionToDo;
+ this.currentState = currentState;
+ }
+
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- RMAppAttemptContainerFinishedEvent containerFinishedEvent
- = (RMAppAttemptContainerFinishedEvent) event;
+ RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+ (RMAppAttemptContainerFinishedEvent) event;
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
@@ -1423,27 +1492,28 @@ public class RMAppAttemptImpl implements
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer != null
&& appAttempt.masterContainer.getId().equals(
- containerStatus.getContainerId())) {
+ containerStatus.getContainerId())) {
+
// Remember the follow up transition and save the final attempt state.
appAttempt.rememberTargetTransitionsAndStoreState(event,
- new ContainerFinishedFinalStateSavedTransition(),
- RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+ transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
return RMAppAttemptState.FINAL_SAVING;
}
- // Normal container.Put it in completedcontainers list
+ // Normal container.Put it in completed containers list
appAttempt.justFinishedContainers.add(containerStatus);
- return RMAppAttemptState.RUNNING;
+ return this.currentState;
}
}
- private static final class ContainerFinishedAtFailedTransition
+ private static final class ContainerFinishedAtFinalStateTransition
extends BaseTransition {
@Override
public void
transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
RMAppAttemptContainerFinishedEvent containerFinishedEvent =
(RMAppAttemptContainerFinishedEvent) event;
+
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
// Normal container. Add it in completed containers list
@@ -1451,18 +1521,17 @@ public class RMAppAttemptImpl implements
}
}
- private static class ContainerFinishedFinalStateSavedTransition extends
+ private static class AMContainerCrashedAtRunningTransition extends
BaseTransition {
@Override
public void
transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
- RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+ RMAppAttemptContainerFinishedEvent finishEvent =
(RMAppAttemptContainerFinishedEvent) event;
// container associated with AM. must not be unmanaged
assert appAttempt.submissionContext.getUnmanagedAM() == false;
- // Setup diagnostic message
- appAttempt.diagnostics
- .append(getAMContainerCrashedDiagnostics(containerFinishedEvent));
+ // Setup diagnostic message and exit status
+ appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
event);
}
@@ -1606,18 +1675,6 @@ public class RMAppAttemptImpl implements
rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
}
- private void checkAttemptStoreError(RMAppAttemptEvent event) {
- RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
- if(storeEvent.getStoredException() != null)
- {
- // This needs to be handled for HA and give up master status if we got
- // fenced
- LOG.error("Failed to store attempt: " + getAppAttemptId(),
- storeEvent.getStoredException());
- ExitUtil.terminate(1, storeEvent.getStoredException());
- }
- }
-
private void storeAttempt() {
// store attempt data in a non-blocking manner to prevent dispatcher
// thread starvation and wait for state to be saved
@@ -1663,4 +1720,16 @@ public class RMAppAttemptImpl implements
}
return attemptReport;
}
+
+ // for testing
+ public boolean mayBeLastAttempt() {
+ return maybeLastAttempt;
+ }
+
+ @Override
+ public RMAppAttemptMetrics getRMAppAttemptMetrics() {
+ // didn't use read/write lock here because RMAppAttemptMetrics has its own
+ // lock
+ return attemptMetrics;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+import java.util.List;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
/**
@@ -58,7 +61,7 @@ public interface RMContainer extends Eve
Priority getAllocatedPriority();
- long getStartTime();
+ long getCreationTime();
long getFinishTime();
@@ -71,5 +74,9 @@ public interface RMContainer extends Eve
ContainerState getContainerState();
ContainerReport createContainerReport();
+
+ boolean isAMContainer();
+
+ List<ResourceRequest> getResourceRequests();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Tue Aug 19 23:49:39 2014
@@ -33,5 +33,7 @@ public enum RMContainerEventType {
RELEASED,
// Source: ContainerAllocationExpirer
- EXPIRE
+ EXPIRE,
+
+ RECOVER
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Tue Aug 19 23:49:39 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import java.util.EnumSet;
+import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -27,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -34,13 +36,17 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -65,6 +71,9 @@ public class RMContainerImpl implements
RMContainerEventType.KILL)
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
+ .addTransition(RMContainerState.NEW,
+ EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
+ RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
// Transitions from RESERVED state
.addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
@@ -147,27 +156,35 @@ public class RMContainerImpl implements
private Resource reservedResource;
private NodeId reservedNode;
private Priority reservedPriority;
- private long startTime;
+ private long creationTime;
private long finishTime;
private ContainerStatus finishedStatus;
+ private boolean isAMContainer;
+ private List<ResourceRequest> resourceRequests;
-
-
+ public RMContainerImpl(Container container,
+ ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+ RMContext rmContext) {
+ this(container, appAttemptId, nodeId, user, rmContext, System
+ .currentTimeMillis());
+ }
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId,
- String user, RMContext rmContext) {
+ String user, RMContext rmContext, long creationTime) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
this.container = container;
this.appAttemptId = appAttemptId;
this.user = user;
- this.startTime = System.currentTimeMillis();
+ this.creationTime = creationTime;
this.rmContext = rmContext;
this.eventHandler = rmContext.getDispatcher().getEventHandler();
this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
-
+ this.isAMContainer = false;
+ this.resourceRequests = null;
+
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@@ -232,8 +249,8 @@ public class RMContainerImpl implements
}
@Override
- public long getStartTime() {
- return startTime;
+ public long getCreationTime() {
+ return creationTime;
}
@Override
@@ -298,6 +315,25 @@ public class RMContainerImpl implements
readLock.unlock();
}
}
+
+ @Override
+ public List<ResourceRequest> getResourceRequests() {
+ try {
+ readLock.lock();
+ return resourceRequests;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public void setResourceRequests(List<ResourceRequest> requests) {
+ try {
+ writeLock.lock();
+ this.resourceRequests = requests;
+ } finally {
+ writeLock.unlock();
+ }
+ }
@Override
public String toString() {
@@ -305,6 +341,25 @@ public class RMContainerImpl implements
}
@Override
+ public boolean isAMContainer() {
+ try {
+ readLock.lock();
+ return isAMContainer;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public void setAMContainer(boolean isAMContainer) {
+ try {
+ writeLock.lock();
+ this.isAMContainer = isAMContainer;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
public void handle(RMContainerEvent event) {
LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType());
try {
@@ -341,6 +396,38 @@ public class RMContainerImpl implements
}
}
+ private static final class ContainerRecoveredTransition
+ implements
+ MultipleArcTransition<RMContainerImpl, RMContainerEvent, RMContainerState> {
+ @Override
+ public RMContainerState transition(RMContainerImpl container,
+ RMContainerEvent event) {
+ NMContainerStatus report =
+ ((RMContainerRecoverEvent) event).getContainerReport();
+ if (report.getContainerState().equals(ContainerState.COMPLETE)) {
+ ContainerStatus status =
+ ContainerStatus.newInstance(report.getContainerId(),
+ report.getContainerState(), report.getDiagnostics(),
+ report.getContainerExitStatus());
+
+ new FinishedTransition().transition(container,
+ new RMContainerFinishedEvent(container.containerId, status,
+ RMContainerEventType.FINISHED));
+ return RMContainerState.COMPLETED;
+ } else if (report.getContainerState().equals(ContainerState.RUNNING)) {
+ // Tell the app
+ container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
+ .getApplicationAttemptId().getApplicationId(), container.nodeId));
+ return RMContainerState.RUNNING;
+ } else {
+ // This can never happen.
+ LOG.warn("RMContainer received unexpected recover event with container"
+ + " state " + report.getContainerState() + " while recovering.");
+ return RMContainerState.RUNNING;
+ }
+ }
+ }
+
private static final class ContainerReservedTransition extends
BaseTransition {
@@ -368,12 +455,15 @@ public class RMContainerImpl implements
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
+ // Clear ResourceRequest stored in RMContainer
+ container.setResourceRequests(null);
+
// Register with containerAllocationExpirer.
container.containerAllocationExpirer.register(container.getContainerId());
- // Tell the appAttempt
- container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
- container.getApplicationAttemptId(), container.getContainer()));
+ // Tell the app
+ container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
+ .getApplicationAttemptId().getApplicationId(), container.nodeId));
}
}
@@ -396,11 +486,30 @@ public class RMContainerImpl implements
container.finishTime = System.currentTimeMillis();
container.finishedStatus = finishedEvent.getRemoteContainerStatus();
// Inform AppAttempt
+ // container.getContainer() can return null when a RMContainer is a
+ // reserved container
+ updateMetricsIfPreempted(container);
+
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
- container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
+ container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
- container.rmContext.getRMApplicationHistoryWriter()
- .containerFinished(container);
+ container.rmContext.getRMApplicationHistoryWriter().containerFinished(
+ container);
+ }
+
+ private static void updateMetricsIfPreempted(RMContainerImpl container) {
+ // If this is a preempted container, update preemption metrics
+ if (ContainerExitStatus.PREEMPTED == container.finishedStatus
+ .getExitStatus()) {
+
+ Resource resource = container.getContainer().getResource();
+ RMAppAttempt rmAttempt =
+ container.rmContext.getRMApps()
+ .get(container.getApplicationAttemptId().getApplicationId())
+ .getCurrentAppAttempt();
+ rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
+ container);
+ }
}
}
@@ -442,7 +551,7 @@ public class RMContainerImpl implements
try {
containerReport = ContainerReport.newInstance(this.getContainerId(),
this.getAllocatedResource(), this.getAllocatedNode(),
- this.getAllocatedPriority(), this.getStartTime(),
+ this.getAllocatedPriority(), this.getCreationTime(),
this.getFinishTime(), this.getDiagnosticsInfo(), this.getLogURL(),
this.getContainerExitStatus(), this.getContainerState());
} finally {
@@ -450,5 +559,4 @@ public class RMContainerImpl implements
}
return containerReport;
}
-
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Tue Aug 19 23:49:39 2014
@@ -20,9 +20,8 @@ package org.apache.hadoop.yarn.server.re
import java.util.ArrayList;
import java.util.EnumSet;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -48,12 +47,15 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -91,9 +93,9 @@ public class RMNodeImpl implements RMNod
private final RMContext context;
private final String hostName;
private final int commandPort;
- private final int httpPort;
+ private int httpPort;
private final String nodeAddress; // The containerManager address
- private final String httpAddress;
+ private String httpAddress;
private volatile ResourceOption resourceOption;
private final Node node;
@@ -102,8 +104,8 @@ public class RMNodeImpl implements RMNod
private String nodeManagerVersion;
/* set of containers that have just launched */
- private final Map<ContainerId, ContainerStatus> justLaunchedContainers =
- new HashMap<ContainerId, ContainerStatus>();
+ private final Set<ContainerId> launchedContainers =
+ new HashSet<ContainerId>();
/* set of containers that need to be cleaned */
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
@@ -454,19 +456,33 @@ public class RMNodeImpl implements RMNod
}
}
+ private static void handleRunningAppOnNode(RMNodeImpl rmNode,
+ RMContext context, ApplicationId appId, NodeId nodeId) {
+ RMApp app = context.getRMApps().get(appId);
+
+ // if we failed getting app by appId, maybe something wrong happened, just
+ // add the app to the finishedApplications list so that the app can be
+ // cleaned up on the NM
+ if (null == app) {
+ LOG.warn("Cannot get RMApp by appId=" + appId
+ + ", just added it to finishedApplications list for cleanup");
+ rmNode.finishedApplications.add(appId);
+ return;
+ }
+
+ context.getDispatcher().getEventHandler()
+ .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
+ }
+
public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
+ RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
+ List<NMContainerStatus> containers = null;
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeAddedSchedulerEvent(rmNode));
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodesListManagerEvent(
- NodesListManagerEventType.NODE_USABLE, rmNode));
-
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
// Old node rejoining
@@ -476,44 +492,51 @@ public class RMNodeImpl implements RMNod
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
+ containers = startEvent.getNMContainerStatuses();
+ if (containers != null && !containers.isEmpty()) {
+ for (NMContainerStatus container : containers) {
+ if (container.getContainerState() == ContainerState.RUNNING) {
+ rmNode.launchedContainers.add(container.getContainerId());
+ }
+ }
+ }
}
+
+ if (null != startEvent.getRunningApplications()) {
+ for (ApplicationId appId : startEvent.getRunningApplications()) {
+ handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
+ }
+ }
+
+ rmNode.context.getDispatcher().getEventHandler()
+ .handle(new NodeAddedSchedulerEvent(rmNode, containers));
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_USABLE, rmNode));
}
}
-
+
public static class ReconnectNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
- // Kill containers since node is rejoining.
- rmNode.nodeUpdateQueue.clear();
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeRemovedSchedulerEvent(rmNode));
-
- RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
- if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
- && rmNode.getHttpPort() == newNode.getHttpPort()) {
- // Reset heartbeat ID since node just restarted.
- rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
- if (rmNode.getState() != NodeState.UNHEALTHY) {
- // Only add new node if old state is not UNHEALTHY
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeAddedSchedulerEvent(rmNode));
+ RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
+ RMNode newNode = reconnectEvent.getReconnectedNode();
+ rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
+ rmNode.httpPort = newNode.getHttpPort();
+ rmNode.httpAddress = newNode.getHttpAddress();
+ rmNode.resourceOption = newNode.getResourceOption();
+
+ // Reset heartbeat ID since node just restarted.
+ rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+
+ if (null != reconnectEvent.getRunningApplications()) {
+ for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
+ handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}
- } else {
- // Reconnected node differs, so replace old node and start new node
- switch (rmNode.getState()) {
- case RUNNING:
- ClusterMetrics.getMetrics().decrNumActiveNodes();
- break;
- case UNHEALTHY:
- ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
- break;
- }
- rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
- rmNode.context.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
}
+
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
@@ -633,14 +656,14 @@ public class RMNodeImpl implements RMNod
// Process running containers
if (remoteContainer.getState() == ContainerState.RUNNING) {
- if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
+ if (!rmNode.launchedContainers.contains(containerId)) {
// Just launched container. RM knows about it the first time.
- rmNode.justLaunchedContainers.put(containerId, remoteContainer);
+ rmNode.launchedContainers.add(containerId);
newlyLaunchedContainers.add(remoteContainer);
}
} else {
// A finished container
- rmNode.justLaunchedContainers.remove(containerId);
+ rmNode.launchedContainers.remove(containerId);
completedContainers.add(remoteContainer);
}
}
@@ -717,4 +740,10 @@ public class RMNodeImpl implements RMNod
public int getQueueSize() {
return nodeUpdateQueue.size();
}
+
+ // For test only.
+ @VisibleForTesting
+ public Set<ContainerId> getLaunchedContainers() {
+ return this.launchedContainers;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java Tue Aug 19 23:49:39 2014
@@ -18,17 +18,27 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
public class RMNodeReconnectEvent extends RMNodeEvent {
private RMNode reconnectedNode;
+ private List<ApplicationId> runningApplications;
- public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
+ public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
+ List<ApplicationId> runningApps) {
super(nodeId, RMNodeEventType.RECONNECTED);
reconnectedNode = newNode;
+ runningApplications = runningApps;
}
public RMNode getReconnectedNode() {
return reconnectedNode;
}
+
+ public List<ApplicationId> getRunningApplications() {
+ return runningApplications;
+ }
}