You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/11/10 21:09:16 UTC
svn commit: r1540535 [3/5] - in
/hadoop/common/branches/YARN-321/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/ha...
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Sun Nov 10 20:09:09 2013
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
@@ -40,7 +39,6 @@ import org.apache.commons.lang.StringUti
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -55,7 +53,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -82,11 +79,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -98,7 +94,6 @@ import org.apache.hadoop.yarn.state.Mult
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -142,7 +137,7 @@ public class RMAppAttemptImpl implements
private float progress = 0;
private String host = "N/A";
private int rpcPort;
- private String origTrackingUrl = "N/A";
+ private String originalTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A";
private long startTime = 0;
@@ -157,6 +152,11 @@ public class RMAppAttemptImpl implements
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
+ private RMAppAttemptEvent eventCausingFinalSaving;
+ private RMAppAttemptState targetedFinalState;
+ private RMAppAttemptState recoveredFinalState;
+ private Object transitionTodo;
+
private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
RMAppAttemptEventType,
@@ -169,68 +169,80 @@ public class RMAppAttemptImpl implements
// Transitions from NEW State
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
RMAppAttemptEventType.START, new AttemptStartedTransition())
- .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.KILLED,
+ .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.KILL,
- new BaseFinalTransition(RMAppAttemptState.KILLED))
- .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
+ new FinalSavingTransition(new BaseFinalTransition(
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+ .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.REGISTERED,
- new UnexpectedAMRegisteredTransition())
- .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED,
- RMAppAttemptEventType.RECOVER)
+ new FinalSavingTransition(
+ new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
+ .addTransition( RMAppAttemptState.NEW,
+ EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
+ RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED),
+ RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SUBMITTED state
- .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
- RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
+ .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
+ RMAppAttemptEventType.APP_REJECTED,
+ new FinalSavingTransition(new AppRejectedTransition(),
+ RMAppAttemptState.FAILED))
.addTransition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.APP_ACCEPTED,
new ScheduleTransition())
- .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
+ .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.KILL,
- new BaseFinalTransition(RMAppAttemptState.KILLED))
- .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
+ new FinalSavingTransition(new BaseFinalTransition(
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+ .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.REGISTERED,
- new UnexpectedAMRegisteredTransition())
+ new FinalSavingTransition(
+ new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
// Transitions from SCHEDULED State
.addTransition(RMAppAttemptState.SCHEDULED,
RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
new AMContainerAllocatedTransition())
- .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
+ .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.KILL,
- new BaseFinalTransition(RMAppAttemptState.KILLED))
+ new FinalSavingTransition(new BaseFinalTransition(
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
// Transitions from ALLOCATED_SAVING State
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,
- RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
new ContainerAcquiredTransition())
// App could be killed by the client. So need to handle this.
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
- RMAppAttemptState.KILLED,
+ RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.KILL,
- new BaseFinalTransition(RMAppAttemptState.KILLED))
-
+ new FinalSavingTransition(new BaseFinalTransition(
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+
// Transitions from LAUNCHED_UNMANAGED_SAVING State
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.LAUNCHED,
- RMAppAttemptEventType.ATTEMPT_SAVED,
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
new UnmanagedAMAttemptSavedTransition())
// attempt should not try to register in this state
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
- RMAppAttemptState.FAILED,
+ RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.REGISTERED,
- new UnexpectedAMRegisteredTransition())
+ new FinalSavingTransition(
+ new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
// App could be killed by the client. So need to handle this.
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
- RMAppAttemptState.KILLED,
+ RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.KILL,
- new BaseFinalTransition(RMAppAttemptState.KILLED))
+ new FinalSavingTransition(new BaseFinalTransition(
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
// Transitions from ALLOCATED State
.addTransition(RMAppAttemptState.ALLOCATED,
@@ -239,32 +251,40 @@ public class RMAppAttemptImpl implements
new ContainerAcquiredTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
- .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
- RMAppAttemptEventType.LAUNCH_FAILED, new LaunchFailedTransition())
- .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.KILLED,
- RMAppAttemptEventType.KILL, new KillAllocatedAMTransition())
+ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
+ RMAppAttemptEventType.LAUNCH_FAILED,
+ new FinalSavingTransition(new LaunchFailedTransition(),
+ RMAppAttemptState.FAILED))
+ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
+ RMAppAttemptEventType.KILL,
+ new FinalSavingTransition(
+ new KillAllocatedAMTransition(), RMAppAttemptState.KILLED))
- .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
+ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
- new AMContainerCrashedTransition())
+ new FinalSavingTransition(
+ new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
// Transitions from LAUNCHED State
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
- .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FAILED,
+ .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
- new AMContainerCrashedTransition())
+ new FinalSavingTransition(
+ new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
.addTransition(
- RMAppAttemptState.LAUNCHED, RMAppAttemptState.FAILED,
+ RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.EXPIRE,
- EXPIRED_TRANSITION)
- .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.KILLED,
+ new FinalSavingTransition(EXPIRED_TRANSITION,
+ RMAppAttemptState.FAILED))
+ .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.KILL,
- new FinalTransition(RMAppAttemptState.KILLED))
+ new FinalSavingTransition(new FinalTransition(
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
// Transitions from RUNNING State
.addTransition(RMAppAttemptState.RUNNING,
- EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
+ EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED),
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
@@ -276,17 +296,41 @@ public class RMAppAttemptImpl implements
new ContainerAcquiredTransition())
.addTransition(
RMAppAttemptState.RUNNING,
- EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED),
+ EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
new ContainerFinishedTransition())
.addTransition(
- RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED,
+ RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.EXPIRE,
- EXPIRED_TRANSITION)
+ new FinalSavingTransition(EXPIRED_TRANSITION,
+ RMAppAttemptState.FAILED))
.addTransition(
- RMAppAttemptState.RUNNING, RMAppAttemptState.KILLED,
+ RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.KILL,
- new FinalTransition(RMAppAttemptState.KILLED))
+ new FinalSavingTransition(new FinalTransition(
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+
+ // Transitions from FINAL_SAVING State
+ .addTransition(RMAppAttemptState.FINAL_SAVING,
+ EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FAILED,
+ RMAppAttemptState.KILLED, RMAppAttemptState.FINISHED),
+ RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED,
+ new FinalStateSavedTransition())
+ .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
+ RMAppAttemptEventType.CONTAINER_FINISHED,
+ new ContainerFinishedAtFinalSavingTransition())
+ .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
+ RMAppAttemptEventType.EXPIRE,
+ new AMExpiredAtFinalSavingTransition())
+ .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
+ EnumSet.of(
+ RMAppAttemptEventType.UNREGISTERED,
+ RMAppAttemptEventType.STATUS_UPDATE,
+ // should be fixed to reject container allocate request at Final
+ // Saving in scheduler
+ RMAppAttemptEventType.CONTAINER_ALLOCATED,
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
+ RMAppAttemptEventType.KILL))
// Transitions from FAILED State
.addTransition(
@@ -338,7 +382,6 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
- RMAppAttemptEventType.ATTEMPT_SAVED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
@@ -357,7 +400,7 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
- RMAppAttemptEventType.ATTEMPT_SAVED,
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
@@ -411,7 +454,7 @@ public class RMAppAttemptImpl implements
public RMAppAttemptState getAppAttemptState() {
this.readLock.lock();
try {
- return this.stateMachine.getCurrentState();
+ return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
@@ -444,7 +487,7 @@ public class RMAppAttemptImpl implements
this.readLock.lock();
try {
return (getSubmissionContext().getUnmanagedAM()) ?
- this.origTrackingUrl : this.proxiedTrackingUrl;
+ this.originalTrackingUrl : this.proxiedTrackingUrl;
} finally {
this.readLock.unlock();
}
@@ -454,7 +497,7 @@ public class RMAppAttemptImpl implements
public String getOriginalTrackingUrl() {
this.readLock.lock();
try {
- return this.origTrackingUrl;
+ return this.originalTrackingUrl;
} finally {
this.readLock.unlock();
}
@@ -490,10 +533,10 @@ public class RMAppAttemptImpl implements
}
private void setTrackingUrlToRMAppPage() {
- origTrackingUrl = pjoin(
+ originalTrackingUrl = pjoin(
WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
"cluster", "app", getAppAttemptId().getApplicationId());
- proxiedTrackingUrl = origTrackingUrl;
+ proxiedTrackingUrl = originalTrackingUrl;
}
// This is only used for RMStateStore. Normal operation must invoke the secret
@@ -539,16 +582,6 @@ public class RMAppAttemptImpl implements
}
}
- public void setDiagnostics(String message) {
- this.writeLock.lock();
-
- try {
- this.diagnostics.append(message);
- } finally {
- this.writeLock.unlock();
- }
- }
-
@Override
public float getProgress() {
this.readLock.lock();
@@ -635,57 +668,34 @@ public class RMAppAttemptImpl implements
@Override
public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
this.readLock.lock();
-
try {
- int numUsedContainers = 0;
- int numReservedContainers = 0;
- Resource currentConsumption = Resources.createResource(0, 0);
- Resource reservedResources = Resources.createResource(0, 0);
-
- SchedulerAppReport schedApp =
- scheduler.getSchedulerAppInfo(this.getAppAttemptId());
- Collection<RMContainer> liveContainers;
- Collection<RMContainer> reservedContainers;
- if (schedApp != null) {
- liveContainers = schedApp.getLiveContainers();
- reservedContainers = schedApp.getReservedContainers();
- if (liveContainers != null) {
- numUsedContainers = liveContainers.size();
- for (RMContainer lc : liveContainers) {
- Resources.addTo(currentConsumption, lc.getContainer().getResource());
- }
- }
- if (reservedContainers != null) {
- numReservedContainers = reservedContainers.size();
- for (RMContainer rc : reservedContainers) {
- Resources.addTo(reservedResources, rc.getContainer().getResource());
- }
- }
- }
-
- return BuilderUtils.newApplicationResourceUsageReport(
- numUsedContainers, numReservedContainers,
- currentConsumption, reservedResources,
- Resources.add(currentConsumption, reservedResources));
+ return scheduler.getAppResourceUsageReport(this.getAppAttemptId());
} finally {
this.readLock.unlock();
}
}
@Override
- public void recover(RMState state) throws Exception{
- ApplicationState appState =
+ public void recover(RMState state) throws Exception {
+ ApplicationState appState =
state.getApplicationState().get(getAppAttemptId().getApplicationId());
- ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
+ ApplicationAttemptState attemptState =
+ appState.getAttempt(getAppAttemptId());
assert attemptState != null;
+ LOG.info("Recovered attempt: AppId: "
+ + getAppAttemptId().getApplicationId() + " AttemptId: "
+ + getAppAttemptId() + " MasterContainer: " + masterContainer);
+ diagnostics.append("Attempt recovered after RM restart");
+ diagnostics.append(attemptState.getDiagnostics());
setMasterContainer(attemptState.getMasterContainer());
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
- LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
- + " AttemptId: " + getAppAttemptId()
- + " MasterContainer: " + masterContainer);
- setDiagnostics("Attempt recovered after RM restart");
- handle(new RMAppAttemptEvent(getAppAttemptId(),
- RMAppAttemptEventType.RECOVER));
+ this.recoveredFinalState = attemptState.getState();
+ this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
+ this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
+ this.finalStatus = attemptState.getFinalApplicationStatus();
+ this.startTime = attemptState.getStartTime();
+ handle(new RMAppAttemptEvent(getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
}
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@@ -763,7 +773,7 @@ public class RMAppAttemptImpl implements
// Save the diagnostic message
String message = rejectedEvent.getMessage();
- appAttempt.setDiagnostics(message);
+ appAttempt.diagnostics.append(message);
// Send the rejection event to app
appAttempt.eventHandler.handle(
@@ -810,10 +820,8 @@ public class RMAppAttemptImpl implements
}
return RMAppAttemptState.SCHEDULED;
} else {
- // RM not allocating container. AM is self launched.
- RMStateStore store = appAttempt.rmContext.getStateStore();
// save state and then go to LAUNCHED state
- appAttempt.storeAttempt(store);
+ appAttempt.storeAttempt();
return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
}
}
@@ -838,8 +846,7 @@ public class RMAppAttemptImpl implements
0));
appAttempt.getSubmissionContext().setResource(
appAttempt.getMasterContainer().getResource());
- RMStateStore store = appAttempt.rmContext.getStateStore();
- appAttempt.storeAttempt(store);
+ appAttempt.storeAttempt();
}
}
@@ -851,6 +858,134 @@ public class RMAppAttemptImpl implements
appAttempt.launchAttempt();
}
}
+
+ private static class AttemptRecoveredTransition
+ implements
+ MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+ @Override
+ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
+ RMAppAttemptEvent event) {
+ if (appAttempt.recoveredFinalState != null) {
+ appAttempt.progress = 1.0f;
+ return appAttempt.recoveredFinalState;
+ } else {
+ return RMAppAttemptState.RECOVERED;
+ }
+ }
+ }
+
+ private void rememberTargetTransitions(RMAppAttemptEvent event,
+ Object transitionToDo, RMAppAttemptState targetFinalState) {
+ transitionTodo = transitionToDo;
+ targetedFinalState = targetFinalState;
+ eventCausingFinalSaving = event;
+ }
+
+ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
+ Object transitionToDo, RMAppAttemptState targetFinalState,
+ RMAppAttemptState stateToBeStored) {
+
+ rememberTargetTransitions(event, transitionToDo, targetFinalState);
+
+ // As of today, finalState, diagnostics, final-tracking-url and
+ // finalAppStatus are the only things that we store into the StateStore
+ // AFTER the initial saving on app-attempt-start
+ // These fields can be visible from outside only after they are saved in
+ // StateStore
+ String diags = null;
+ String finalTrackingUrl = null;
+ FinalApplicationStatus finalStatus = null;
+
+ switch (event.getType()) {
+ case APP_REJECTED:
+ RMAppAttemptRejectedEvent rejectedEvent =
+ (RMAppAttemptRejectedEvent) event;
+ diags = rejectedEvent.getMessage();
+ break;
+ case LAUNCH_FAILED:
+ RMAppAttemptLaunchFailedEvent launchFaileEvent =
+ (RMAppAttemptLaunchFailedEvent) event;
+ diags = launchFaileEvent.getMessage();
+ break;
+ case REGISTERED:
+ diags = getUnexpectedAMRegisteredDiagnostics();
+ break;
+ case UNREGISTERED:
+ RMAppAttemptUnregistrationEvent unregisterEvent =
+ (RMAppAttemptUnregistrationEvent) event;
+ diags = unregisterEvent.getDiagnostics();
+ finalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
+ finalStatus = unregisterEvent.getFinalApplicationStatus();
+ break;
+ case CONTAINER_FINISHED:
+ RMAppAttemptContainerFinishedEvent finishEvent =
+ (RMAppAttemptContainerFinishedEvent) event;
+ diags = getAMContainerCrashedDiagnostics(finishEvent);
+ break;
+ case KILL:
+ break;
+ case EXPIRE:
+ diags = getAMExpiredDiagnostics(event);
+ break;
+ default:
+ break;
+ }
+
+ RMStateStore rmStore = rmContext.getStateStore();
+ ApplicationAttemptState attemptState =
+ new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
+ rmStore.getCredentialsFromAppAttempt(this), startTime,
+ stateToBeStored, finalTrackingUrl, diags, finalStatus);
+ LOG.info("Updating application attempt " + applicationAttemptId
+ + " with final state: " + targetedFinalState);
+ rmStore.updateApplicationAttemptState(attemptState);
+ }
+
+ private static class FinalSavingTransition extends BaseTransition {
+
+ Object transitionToDo;
+ RMAppAttemptState targetedFinalState;
+
+ public FinalSavingTransition(Object transitionToDo,
+ RMAppAttemptState targetedFinalState) {
+ this.transitionToDo = transitionToDo;
+ this.targetedFinalState = targetedFinalState;
+ }
+
+ @Override
+ public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+ // For cases Killed/Failed, targetedFinalState is the same as the state to
+ // be stored
+ appAttempt.rememberTargetTransitionsAndStoreState(event, transitionToDo,
+ targetedFinalState, targetedFinalState);
+ }
+ }
+
+ private static class FinalStateSavedTransition implements
+ MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+ @Override
+ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
+ RMAppAttemptEvent event) {
+ RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
+ if (storeEvent.getUpdatedException() != null) {
+ LOG.error("Failed to update the final state of application attempt: "
+ + storeEvent.getApplicationAttemptId(),
+ storeEvent.getUpdatedException());
+ ExitUtil.terminate(1, storeEvent.getUpdatedException());
+ }
+
+ RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
+
+ if (appAttempt.transitionTodo instanceof SingleArcTransition) {
+ ((SingleArcTransition) appAttempt.transitionTodo).transition(
+ appAttempt, causeEvent);
+ } else if (appAttempt.transitionTodo instanceof MultipleArcTransition) {
+ ((MultipleArcTransition) appAttempt.transitionTodo).transition(
+ appAttempt, causeEvent);
+ }
+ return appAttempt.targetedFinalState;
+ }
+ }
private static class BaseFinalTransition extends BaseTransition {
@@ -998,15 +1133,20 @@ public class RMAppAttemptImpl implements
= (RMAppAttemptRegistrationEvent) event;
appAttempt.host = registrationEvent.getHost();
appAttempt.rpcPort = registrationEvent.getRpcport();
- appAttempt.origTrackingUrl =
+ appAttempt.originalTrackingUrl =
sanitizeTrackingUrl(registrationEvent.getTrackingurl());
appAttempt.proxiedTrackingUrl =
- appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
+ appAttempt.generateProxyUriWithScheme(appAttempt.originalTrackingUrl);
// Let the app know
appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
.getAppAttemptId().getApplicationId(),
RMAppEventType.ATTEMPT_REGISTERED));
+
+ // TODO:FIXME: Note for future. Unfortunately we only do a state-store
+ // write at AM launch time, so we don't save the AM's tracking URL anywhere
+ // as that would mean an extra state-store write. For now, we hope that in
+ // work-preserving restart, AMs are forced to reregister.
}
}
@@ -1029,17 +1169,24 @@ public class RMAppAttemptImpl implements
appAttempt.getAppAttemptId());
// Setup diagnostic message
- ContainerStatus status = finishEvent.getContainerStatus();
- appAttempt.diagnostics.append("AM Container for " +
- appAttempt.getAppAttemptId() + " exited with " +
- " exitCode: " + status.getExitStatus() +
- " due to: " + status.getDiagnostics() + "." +
- "Failing this attempt.");
+ appAttempt.diagnostics
+ .append(getAMContainerCrashedDiagnostics(finishEvent));
// Tell the app, scheduler
super.transition(appAttempt, finishEvent);
}
}
+ private static String getAMContainerCrashedDiagnostics(
+ RMAppAttemptContainerFinishedEvent finishEvent) {
+ ContainerStatus status = finishEvent.getContainerStatus();
+ String diagnostics =
+ "AM Container for " + finishEvent.getApplicationAttemptId()
+ + " exited with " + " exitCode: " + status.getExitStatus()
+ + " due to: " + status.getDiagnostics() + "."
+ + "Failing this attempt.";
+ return diagnostics;
+ }
+
private static class FinalTransition extends BaseFinalTransition {
public FinalTransition(RMAppAttemptState finalAttemptState) {
@@ -1055,7 +1202,8 @@ public class RMAppAttemptImpl implements
// Tell the app and the scheduler
super.transition(appAttempt, event);
- // UnRegister from AMLivelinessMonitor
+ // UnRegister from AMLivelinessMonitor. Perhaps for
+ // FAILING/KILLED/UnManaged AMs
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
appAttempt.getAppAttemptId());
appAttempt.rmContext.getAMFinishingMonitor().unregister(
@@ -1078,12 +1226,18 @@ public class RMAppAttemptImpl implements
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- appAttempt.diagnostics.append("ApplicationMaster for attempt " +
- appAttempt.getAppAttemptId() + " timed out");
+ appAttempt.diagnostics.append(getAMExpiredDiagnostics(event));
super.transition(appAttempt, event);
}
}
+ private static String getAMExpiredDiagnostics(RMAppAttemptEvent event) {
+ String diag =
+ "ApplicationMaster for attempt " + event.getApplicationAttemptId()
+ + " timed out";
+ return diag;
+ }
+
private static class UnexpectedAMRegisteredTransition extends
BaseFinalTransition {
@@ -1094,13 +1248,16 @@ public class RMAppAttemptImpl implements
@Override
public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
assert appAttempt.submissionContext.getUnmanagedAM();
- appAttempt
- .setDiagnostics("Unmanaged AM must register after AM attempt reaches LAUNCHED state.");
+ appAttempt.diagnostics.append(getUnexpectedAMRegisteredDiagnostics());
super.transition(appAttempt, event);
}
}
+ private static String getUnexpectedAMRegisteredDiagnostics() {
+ return "Unmanaged AM must register after AM attempt reaches LAUNCHED state.";
+ }
+
private static final class StatusUpdateTransition extends
BaseTransition {
@Override
@@ -1125,38 +1282,62 @@ public class RMAppAttemptImpl implements
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
-
- appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
-
- appAttempt.progress = 1.0f;
-
- RMAppAttemptUnregistrationEvent unregisterEvent
- = (RMAppAttemptUnregistrationEvent) event;
- appAttempt.diagnostics.append(unregisterEvent.getDiagnostics());
- appAttempt.origTrackingUrl =
- sanitizeTrackingUrl(unregisterEvent.getTrackingUrl());
- appAttempt.proxiedTrackingUrl =
- appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
- appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
-
// Tell the app
if (appAttempt.getSubmissionContext().getUnmanagedAM()) {
// Unmanaged AMs have no container to wait for, so they skip
// the FINISHING state and go straight to FINISHED.
+ appAttempt.updateInfoOnAMUnregister(event);
new FinalTransition(RMAppAttemptState.FINISHED).transition(
appAttempt, event);
return RMAppAttemptState.FINISHED;
}
- appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId);
+ // Saving the attempt final state
+ appAttempt.rememberTargetTransitionsAndStoreState(event,
+ new FinalStateSavedAfterAMUnregisterTransition(),
+ RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED);
ApplicationId applicationId =
appAttempt.getAppAttemptId().getApplicationId();
- appAttempt.eventHandler.handle(
- new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED));
- return RMAppAttemptState.FINISHING;
+
+ // Tell the app immediately that AM is unregistering so that app itself
+ // can save its state as soon as possible. Whether we do it like this, or
+ // we wait till AppAttempt is saved, it doesn't make any difference on the
+ // app side w.r.t failure conditions. The only event going out of
+ // AppAttempt to App after this point of time is AM/AppAttempt Finished.
+ appAttempt.eventHandler.handle(new RMAppEvent(applicationId,
+ RMAppEventType.ATTEMPT_UNREGISTERED));
+ return RMAppAttemptState.FINAL_SAVING;
}
}
+ private static class FinalStateSavedAfterAMUnregisterTransition extends
+ BaseTransition {
+ @Override
+ public void
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+ // Unregister from the AMlivenessMonitor and register with AMFinishingMonitor
+ appAttempt.rmContext.getAMLivelinessMonitor().unregister(
+ appAttempt.applicationAttemptId);
+ appAttempt.rmContext.getAMFinishingMonitor().register(
+ appAttempt.applicationAttemptId);
+
+ // Do not make any more changes to this transition code. Make all changes
+ // to the following method. Unless you are absolutely sure that you have
+ // stuff to do that shouldn't be used by the callers of the following
+ // method.
+ appAttempt.updateInfoOnAMUnregister(event);
+ }
+ }
+
+ private void updateInfoOnAMUnregister(RMAppAttemptEvent event) {
+ progress = 1.0f;
+ RMAppAttemptUnregistrationEvent unregisterEvent =
+ (RMAppAttemptUnregistrationEvent) event;
+ diagnostics.append(unregisterEvent.getDiagnostics());
+ originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
+ proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
+ finalStatus = unregisterEvent.getFinalApplicationStatus();
+ }
+
private static final class ContainerAcquiredTransition extends
BaseTransition {
@Override
@@ -1185,29 +1366,37 @@ public class RMAppAttemptImpl implements
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer != null
&& appAttempt.masterContainer.getId().equals(
- containerStatus.getContainerId())) {
- // container associated with AM. must not be unmanaged
- assert appAttempt.submissionContext.getUnmanagedAM() == false;
- // Setup diagnostic message
- appAttempt.diagnostics.append("AM Container for " +
- appAttempt.getAppAttemptId() + " exited with " +
- " exitCode: " + containerStatus.getExitStatus() +
- " due to: " + containerStatus.getDiagnostics() + "." +
- "Failing this attempt.");
-
- new FinalTransition(RMAppAttemptState.FAILED).transition(
- appAttempt, containerFinishedEvent);
- return RMAppAttemptState.FAILED;
+ containerStatus.getContainerId())) {
+ // Remember the follow up transition and save the final attempt state.
+ appAttempt.rememberTargetTransitionsAndStoreState(event,
+ new ContainerFinishedFinalStateSavedTransition(),
+ RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+ return RMAppAttemptState.FINAL_SAVING;
}
- // Normal container.
-
- // Put it in completedcontainers list
+ // Normal container.Put it in completedcontainers list
appAttempt.justFinishedContainers.add(containerStatus);
return RMAppAttemptState.RUNNING;
}
}
+ private static class ContainerFinishedFinalStateSavedTransition extends
+ BaseTransition {
+ @Override
+ public void
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+ RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+ (RMAppAttemptContainerFinishedEvent) event;
+ // container associated with AM. must not be unmanaged
+ assert appAttempt.submissionContext.getUnmanagedAM() == false;
+ // Setup diagnostic message
+ appAttempt.diagnostics
+ .append(getAMContainerCrashedDiagnostics(containerFinishedEvent));
+ new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
+ event);
+ }
+ }
+
private static final class AMFinishingContainerFinishedTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@@ -1228,13 +1417,83 @@ public class RMAppAttemptImpl implements
appAttempt, containerFinishedEvent);
return RMAppAttemptState.FINISHED;
}
-
// Normal container.
appAttempt.justFinishedContainers.add(containerStatus);
return RMAppAttemptState.FINISHING;
}
}
+ private static class ContainerFinishedAtFinalSavingTransition extends
+ BaseTransition {
+ @Override
+ public void
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+ RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+ (RMAppAttemptContainerFinishedEvent) event;
+ ContainerStatus containerStatus =
+ containerFinishedEvent.getContainerStatus();
+
+ // If this is the AM container, it means the AM container is finished,
+ // but we are not yet acknowledged that the final state has been saved.
+ // Thus, we still return FINAL_SAVING state here.
+ if (appAttempt.masterContainer.getId().equals(
+ containerStatus.getContainerId())) {
+ if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
+ || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
+ // ignore Container_Finished Event if we were supposed to reach
+ // FAILED/KILLED state.
+ return;
+ }
+
+ // pass in the earlier AMUnregistered Event also, as this is needed for
+ // AMFinishedAfterFinalSavingTransition later on
+ appAttempt.rememberTargetTransitions(event,
+ new AMFinishedAfterFinalSavingTransition(
+ appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
+ return;
+ }
+ // Normal container.
+ appAttempt.justFinishedContainers.add(containerStatus);
+ }
+ }
+
+ private static class AMFinishedAfterFinalSavingTransition extends
+ BaseTransition {
+ RMAppAttemptEvent amUnregisteredEvent;
+ public AMFinishedAfterFinalSavingTransition(
+ RMAppAttemptEvent amUnregisteredEvent) {
+ this.amUnregisteredEvent = amUnregisteredEvent;
+ }
+
+ @Override
+ public void
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+ appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent);
+ new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt,
+ event);
+ }
+ }
+
+ private static class AMExpiredAtFinalSavingTransition extends
+ BaseTransition {
+ @Override
+ public void
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+ if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
+ || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
+ // ignore Container_Finished Event if we were supposed to reach
+ // FAILED/KILLED state.
+ return;
+ }
+
+ // pass in the earlier AMUnregistered Event also, as this is needed for
+ // AMFinishedAfterFinalSavingTransition later on
+ appAttempt.rememberTargetTransitions(event,
+ new AMFinishedAfterFinalSavingTransition(
+ appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
+ }
+ }
+
@Override
public long getStartTime() {
this.readLock.lock();
@@ -1256,7 +1515,7 @@ public class RMAppAttemptImpl implements
}
private void checkAttemptStoreError(RMAppAttemptEvent event) {
- RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
+ RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
if(storeEvent.getStoredException() != null)
{
// This needs to be handled for HA and give up master status if we got
@@ -1267,7 +1526,7 @@ public class RMAppAttemptImpl implements
}
}
- private void storeAttempt(RMStateStore store) {
+ private void storeAttempt() {
// store attempt data in a non-blocking manner to prevent dispatcher
// thread starvation and wait for state to be saved
LOG.info("Storing attempt: AppId: " +
@@ -1275,7 +1534,7 @@ public class RMAppAttemptImpl implements
+ " AttemptId: " +
getAppAttemptId()
+ " MasterContainer: " + masterContainer);
- store.storeApplicationAttempt(this);
+ rmContext.getStateStore().storeNewApplicationAttempt(this);
}
private void removeCredentials(RMAppAttemptImpl appAttempt) {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Sun Nov 10 20:09:09 2013
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.re
public enum RMAppAttemptState {
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING,
- FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED
+ FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED,
+ FINAL_SAVING
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java Sun Nov 10 20:09:09 2013
@@ -25,20 +25,20 @@ import org.apache.hadoop.yarn.server.res
public class RMAppAttemptUnregistrationEvent extends RMAppAttemptEvent {
- private final String trackingUrl;
+ private final String finalTrackingUrl;
private final FinalApplicationStatus finalStatus;
private final String diagnostics;
public RMAppAttemptUnregistrationEvent(ApplicationAttemptId appAttemptId,
String trackingUrl, FinalApplicationStatus finalStatus, String diagnostics) {
super(appAttemptId, RMAppAttemptEventType.UNREGISTERED);
- this.trackingUrl = trackingUrl;
+ this.finalTrackingUrl = trackingUrl;
this.finalStatus = finalStatus;
this.diagnostics = diagnostics;
}
- public String getTrackingUrl() {
- return this.trackingUrl;
+ public String getFinalTrackingUrl() {
+ return this.finalTrackingUrl;
}
public FinalApplicationStatus getFinalApplicationStatus() {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java Sun Nov 10 20:09:09 2013
@@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
/**
@@ -35,6 +37,9 @@ import org.apache.hadoop.yarn.server.api
*/
public interface RMNode {
+ /** negative value means no timeout */
+ public static final int OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT = -1;
+
/**
* the node id of of this node.
* @return the node id of this node.
@@ -94,7 +99,19 @@ public interface RMNode {
* the total available resource.
* @return the total available resource.
*/
- public org.apache.hadoop.yarn.api.records.Resource getTotalCapability();
+ public Resource getTotalCapability();
+
+ /**
+ * Set resource option with total available resource and overCommitTimoutMillis
+ * @param resourceOption
+ */
+ public void setResourceOption(ResourceOption resourceOption);
+
+ /**
+ * resource option with total available resource and overCommitTimoutMillis
+ * @return ResourceOption
+ */
+ public ResourceOption getResourceOption();
/**
* The rack name for this node manager.
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Sun Nov 10 20:09:09 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -92,7 +93,7 @@ public class RMNodeImpl implements RMNod
private final int httpPort;
private final String nodeAddress; // The containerManager address
private final String httpAddress;
- private final Resource totalCapability;
+ private volatile ResourceOption resourceOption;
private final Node node;
private String healthReport;
@@ -173,13 +174,13 @@ public class RMNodeImpl implements RMNod
RMNodeEvent> stateMachine;
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
- int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) {
+ int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) {
this.nodeId = nodeId;
this.context = context;
this.hostName = hostName;
this.commandPort = cmPort;
this.httpPort = httpPort;
- this.totalCapability = capability;
+ this.resourceOption = resourceOption;
this.nodeAddress = hostName + ":" + cmPort;
this.httpAddress = hostName + ":" + httpPort;
this.node = node;
@@ -235,14 +236,24 @@ public class RMNodeImpl implements RMNod
@Override
public Resource getTotalCapability() {
- return this.totalCapability;
+ return this.resourceOption.getResource();
+ }
+
+ @Override
+ public void setResourceOption(ResourceOption resourceOption) {
+ this.resourceOption = resourceOption;
+ }
+
+ @Override
+ public ResourceOption getResourceOption(){
+ return this.resourceOption;
}
@Override
public String getRackName() {
return node.getNetworkLocation();
}
-
+
@Override
public Node getNode() {
return this.node;
@@ -438,7 +449,10 @@ public class RMNodeImpl implements RMNod
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
-
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_USABLE, rmNode));
+
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
// Old node rejoining
@@ -471,7 +485,7 @@ public class RMNodeImpl implements RMNod
// Only add new node if old state is not UNHEALTHY
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
- }
+ }
} else {
// Reconnected node differs, so replace old node and start new node
switch (rmNode.getState()) {
@@ -486,6 +500,9 @@ public class RMNodeImpl implements RMNod
rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
}
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_USABLE, rmNode));
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Sun Nov 10 20:09:09 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -397,5 +398,12 @@ public abstract class SchedulerApplicati
lastScheduledContainer.put(priority, currentTimeMs);
schedulingOpportunities.setCount(priority, 0);
}
+
+ public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
+ return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
+ reservedContainers.size(), Resources.clone(currentConsumption),
+ Resources.clone(currentReservation),
+ Resources.add(currentConsumption, currentReservation));
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Sun Nov 10 20:09:09 2013
@@ -67,6 +67,12 @@ public abstract class SchedulerNode {
* @return number of active containers on the node
*/
public abstract int getNumContainers();
+
+ /**
+ * Apply delta resource on node's available resource.
+ * @param deltaResource the delta of resource need to apply to node
+ */
+ public abstract void applyDeltaOnAvailableResource(Resource deltaResource);
/**
* Get total resources on the node.
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Sun Nov 10 20:09:09 2013
@@ -19,20 +19,19 @@ package org.apache.hadoop.yarn.server.re
import java.util.List;
+import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -147,6 +146,37 @@ public class SchedulerUtils {
maximumResource, minimumResource);
ask.setCapability(normalized);
}
+
+ /**
+ * Update resource in SchedulerNode if any resource change in RMNode.
+ * @param node SchedulerNode with old resource view
+ * @param rmNode RMNode with new resource view
+ * @param clusterResource the cluster's resource that need to update
+ * @param log Scheduler's log for resource change
+ */
+ public static void updateResourceIfChanged(SchedulerNode node,
+ RMNode rmNode, Resource clusterResource, Log log) {
+ Resource oldAvailableResource = node.getAvailableResource();
+ Resource newAvailableResource = Resources.subtract(
+ rmNode.getTotalCapability(), node.getUsedResource());
+
+ if (!newAvailableResource.equals(oldAvailableResource)) {
+ Resource deltaResource = Resources.subtract(newAvailableResource,
+ oldAvailableResource);
+ // Reflect resource change to scheduler node.
+ node.applyDeltaOnAvailableResource(deltaResource);
+ // Reflect resource change to clusterResource.
+ Resources.addTo(clusterResource, deltaResource);
+ // TODO process resource over-commitment case (allocated containers
+ // > total capacity) in different option by getting value of
+ // overCommitTimeoutMillis.
+
+ // Log resource change
+ log.info("Resource change on node: " + rmNode.getNodeAddress()
+ + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
+ + deltaResource.getMemory() +"MB");
+ }
+ }
/**
* Utility method to normalize a list of resource requests, by insuring that
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Sun Nov 10 20:09:09 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -130,6 +131,16 @@ public interface YarnScheduler extends E
SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId appAttemptId);
/**
+ * Get a resource usage report from a given app attempt ID.
+ * @param appAttemptId the id of the application attempt
+ * @return resource usage report for this given attempt
+ */
+ @LimitedPrivate("yarn")
+ @Evolving
+ ApplicationResourceUsageReport getAppResourceUsageReport(
+ ApplicationAttemptId appAttemptId);
+
+ /**
* Get the root queue for the scheduler.
* @return the root queue for the scheduler.
*/
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sun Nov 10 20:09:09 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -628,6 +629,10 @@ public class CapacityScheduler
}
FiCaSchedulerNode node = getNode(nm.getNodeID());
+
+ // Update resource if any change
+ SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
+
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@@ -694,7 +699,7 @@ public class CapacityScheduler
node.getReservedContainer().getContainerId().getApplicationAttemptId()
);
}
-
+
}
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
@@ -856,6 +861,13 @@ public class CapacityScheduler
return app == null ? null : new SchedulerAppReport(app);
}
+ @Override
+ public ApplicationResourceUsageReport getAppResourceUsageReport(
+ ApplicationAttemptId applicationAttemptId) {
+ FiCaSchedulerApp app = getApplication(applicationAttemptId);
+ return app == null ? null : app.getResourceUsageReport();
+ }
+
@Lock(Lock.NoLock.class)
FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Sun Nov 10 20:09:09 2013
@@ -268,4 +268,10 @@ public class FiCaSchedulerNode extends S
return reservedContainer;
}
+ @Override
+ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
+ // we can only adjust available resource if total resource is changed.
+ Resources.addTo(this.availableResource, deltaResource);
+ }
+
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Sun Nov 10 20:09:09 2013
@@ -269,4 +269,11 @@ public class FSSchedulerNode extends Sch
public synchronized AppSchedulable getReservedAppSchedulable() {
return reservedAppSchedulable;
}
+
+ @Override
+ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
+ // we can only adjust available resource if total resource is changed.
+ Resources.addTo(this.availableResource, deltaResource);
+ }
+
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Sun Nov 10 20:09:09 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -181,6 +182,8 @@ public class FairScheduler implements Re
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
+ private Comparator nodeAvailableResourceComparator =
+ new NodeAvailableResourceComparator(); // Node available resource comparator
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
protected long nodeLocalityDelayMs; // Delay for node locality
@@ -917,6 +920,9 @@ public class FairScheduler implements Re
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = nodes.get(nm.getNodeID());
+ // Update resource if any change
+ SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
+
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@@ -948,14 +954,22 @@ public class FairScheduler implements Re
private void continuousScheduling() {
while (true) {
- for (FSSchedulerNode node : nodes.values()) {
- try {
- if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
- attemptScheduling(node);
+ List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+ Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+
+ // iterate all nodes
+ for (NodeId nodeId : nodeIdList) {
+ if (nodes.containsKey(nodeId)) {
+ FSSchedulerNode node = nodes.get(nodeId);
+ try {
+ if (Resources.fitsIn(minimumAllocation,
+ node.getAvailableResource())) {
+ attemptScheduling(node);
+ }
+ } catch (Throwable ex) {
+ LOG.warn("Error while attempting scheduling for node " + node +
+ ": " + ex.toString(), ex);
}
- } catch (Throwable ex) {
- LOG.warn("Error while attempting scheduling for node " + node + ": " +
- ex.toString(), ex);
}
}
try {
@@ -966,6 +980,17 @@ public class FairScheduler implements Re
}
}
}
+
+ /** Sort nodes by available resource */
+ private class NodeAvailableResourceComparator implements Comparator<NodeId> {
+
+ @Override
+ public int compare(NodeId n1, NodeId n2) {
+ return RESOURCE_CALCULATOR.compare(clusterCapacity,
+ nodes.get(n2).getAvailableResource(),
+ nodes.get(n1).getAvailableResource());
+ }
+ }
private synchronized void attemptScheduling(FSSchedulerNode node) {
// Assign new containers...
@@ -1030,6 +1055,17 @@ public class FairScheduler implements Re
return new SchedulerAppReport(applications.get(appAttemptId));
}
+ @Override
+ public ApplicationResourceUsageReport getAppResourceUsageReport(
+ ApplicationAttemptId appAttemptId) {
+ FSSchedulerApp app = applications.get(appAttemptId);
+ if (app == null) {
+ LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
+ return null;
+ }
+ return app.getResourceUsageReport();
+ }
+
/**
* Subqueue metrics might be a little out of date because fair shares are
* recalculated at the update interval, but the root queue metrics needs to
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sun Nov 10 20:09:09 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -99,7 +100,7 @@ public class FifoScheduler implements Re
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
private RMContext rmContext;
- private Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
+ protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private boolean initialized;
private Resource minimumAllocation;
@@ -327,6 +328,13 @@ public class FifoScheduler implements Re
return app == null ? null : new SchedulerAppReport(app);
}
+ @Override
+ public ApplicationResourceUsageReport getAppResourceUsageReport(
+ ApplicationAttemptId applicationAttemptId) {
+ FiCaSchedulerApp app = getApplication(applicationAttemptId);
+ return app == null ? null : app.getResourceUsageReport();
+ }
+
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
@@ -620,6 +628,9 @@ public class FifoScheduler implements Re
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
+ // Update resource if any change
+ SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG);
+
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@@ -653,7 +664,7 @@ public class FifoScheduler implements Re
metrics.setAvailableResourcesToQueue(
Resources.subtract(clusterResource, usedResource));
- }
+ }
@Override
public void handle(SchedulerEvent event) {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Sun Nov 10 20:09:09 2013
@@ -77,7 +77,8 @@ public class FairSchedulerPage extends R
if (maxApps < Integer.MAX_VALUE) {
ri._("Max Running Applications:", qinfo.getMaxApplications());
}
- ri._("Fair Share:", qinfo.getFairShare().toString());
+ ri._("Fair Share:", StringEscapeUtils.escapeHtml(
+ qinfo.getFairShare().toString()));
html._(InfoBlock.class);
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java Sun Nov 10 20:09:09 2013
@@ -26,8 +26,10 @@ import javax.xml.bind.annotation.XmlTran
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -71,6 +73,9 @@ public class AppInfo {
protected long elapsedTime;
protected String amContainerLogs;
protected String amHostHttpAddress;
+ protected int allocatedMB;
+ protected int allocatedVCores;
+ protected int runningContainers;
public AppInfo() {
} // JAXB needs this
@@ -132,6 +137,15 @@ public class AppInfo {
this.amContainerLogs = url;
this.amHostHttpAddress = masterContainer.getNodeHttpAddress();
}
+
+ ApplicationResourceUsageReport resourceReport = attempt
+ .getApplicationResourceUsageReport();
+ if (resourceReport != null) {
+ Resource usedResources = resourceReport.getUsedResources();
+ allocatedMB = usedResources.getMemory();
+ allocatedVCores = usedResources.getVirtualCores();
+ runningContainers = resourceReport.getNumUsedContainers();
+ }
}
}
}
@@ -224,5 +238,17 @@ public class AppInfo {
public String getApplicationType() {
return this.applicationType;
}
-
+
+ public int getRunningContainers() {
+ return this.runningContainers;
+ }
+
+ public int getAllocatedMB() {
+ return this.allocatedMB;
+ }
+
+ public int getAllocatedVCores() {
+ return this.allocatedVCores;
+ }
+
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Sun Nov 10 20:09:09 2013
@@ -202,6 +202,12 @@ public class MockAM {
final FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", "");
+ unregisterAppAttempt(req);
+ }
+
+ public void unregisterAppAttempt(final FinishApplicationMasterRequest req)
+ throws Exception {
+ waitForState(RMAppAttemptState.RUNNING);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token =
@@ -216,4 +222,8 @@ public class MockAM {
}
});
}
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return this.attemptId;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Sun Nov 10 20:09:09 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -93,14 +94,14 @@ public class MockNodes {
private String nodeAddr;
private String httpAddress;
private int cmdPort;
- private Resource perNode;
+ private ResourceOption perNode;
private String rackName;
private String healthReport;
private long lastHealthReportTime;
private NodeState state;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
- Resource perNode, String rackName, String healthReport,
+ ResourceOption perNode, String rackName, String healthReport,
long lastHealthReportTime, int cmdPort, String hostName, NodeState state) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
@@ -146,7 +147,7 @@ public class MockNodes {
@Override
public Resource getTotalCapability() {
- return this.perNode;
+ return this.perNode.getResource();
}
@Override
@@ -202,6 +203,17 @@ public class MockNodes {
public long getLastHealthReportTime() {
return lastHealthReportTime;
}
+
+ @Override
+ public void setResourceOption(ResourceOption resourceOption) {
+ this.perNode = resourceOption;
+ }
+
+ @Override
+ public ResourceOption getResourceOption(){
+ return this.perNode;
+ }
+
};
private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {
@@ -220,8 +232,9 @@ public class MockNodes {
final String httpAddress = httpAddr;
String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
- return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName,
- healthReport, 0, nid, hostName, state);
+ return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress,
+ ResourceOption.newInstance(perNode, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
+ rackName, healthReport, 0, nid, hostName, state);
}
public static RMNode nodeInfo(int rack, final Resource perNode,
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Sun Nov 10 20:09:09 2013
@@ -163,6 +163,14 @@ public class MockRM extends ResourceMana
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType) throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, true);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted) throws Exception {
ApplicationClientProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@@ -222,7 +230,9 @@ public class MockRM extends ResourceMana
}.setClientReq(client, req);
fakeUser.doAs(action);
// make sure app is immediately available after submit
- waitForState(appId, RMAppState.ACCEPTED);
+ if (waitForAccepted) {
+ waitForState(appId, RMAppState.ACCEPTED);
+ }
return getRMContext().getRMApps().get(appId);
}