You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/01/10 16:05:19 UTC
svn commit: r1557144 [1/3] - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/proto/
hadoop-yarn/hadoop-yarn-common/src/main/j...
Author: vinodkv
Date: Fri Jan 10 15:05:18 2014
New Revision: 1557144
URL: http://svn.apache.org/r1557144
Log:
YARN-1490. Introduced the ability to make ResourceManager optionally not kill all containers when an ApplicationMaster exits. Contributed by Jian He.
svn merge --ignore-ancestry -c 1557143 ../../trunk/
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppStartAttemptEvent.java
- copied unchanged from r1557143, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppStartAttemptEvent.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Fri Jan 10 15:05:18 2014
@@ -40,6 +40,9 @@ Release 2.4.0 - UNRELEASED
YARN-1029. Added embedded leader election in the ResourceManager. (Karthik
Kambatla via vinodkv)
+ YARN-1490. Introduced the ability to make ResourceManager optionally not kill
+ all containers when an ApplicationMaster exits. (Jian He via vinodkv)
+
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java Fri Jan 10 15:05:18 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -57,7 +58,8 @@ public abstract class ApplicationSubmiss
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
- int maxAppAttempts, Resource resource, String applicationType) {
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers) {
ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class);
context.setApplicationId(applicationId);
@@ -70,6 +72,7 @@ public abstract class ApplicationSubmiss
context.setMaxAppAttempts(maxAppAttempts);
context.setResource(resource);
context.setApplicationType(applicationType);
+ context.setKeepContainersAcrossApplicationAttempts(keepContainers);
return context;
}
@@ -79,6 +82,18 @@ public abstract class ApplicationSubmiss
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType) {
+ return newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, null, false);
+ }
+
+ @Public
+ @Stable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource) {
return newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
@@ -268,4 +283,35 @@ public abstract class ApplicationSubmiss
@Public
@Stable
public abstract void setApplicationType(String applicationType);
+
+
+ /**
+ * Get the flag which indicates whether to keep containers across application
+ * attempts or not.
+ *
+ * @return the flag which indicates whether to keep containers across
+ * application attempts or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getKeepContainersAcrossApplicationAttempts();
+
+ /**
+ * Set the flag which indicates whether to keep containers across application
+ * attempts.
+ * <p>
+ * If the flag is true, running containers will not be killed when application
+ * attempt fails and these containers will be retrieved by the new application
+ * attempt on registration via
+ * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}.
+ * </p>
+ *
+ * @param keepContainers
+ * the flag which indicates whether to keep containers across
+ * application attempts.
+ */
+ @Public
+ @Stable
+ public abstract void setKeepContainersAcrossApplicationAttempts(
+ boolean keepContainers);
}
\ No newline at end of file
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java Fri Jan 10 15:05:18 2014
@@ -46,10 +46,20 @@ public abstract class ContainerId implem
}
/**
- * Get the <code>ApplicationAttemptId</code> of the application to which
- * the <code>Container</code> was assigned.
- * @return <code>ApplicationAttemptId</code> of the application to which
- * the <code>Container</code> was assigned
+ * Get the <code>ApplicationAttemptId</code> of the application to which the
+ * <code>Container</code> was assigned.
+ * <p>
+ * Note: If containers are kept alive across application attempts via
+ * {@link ApplicationSubmissionContext#setKeepContainersAcrossApplicationAttempts(boolean)}
+ * the <code>ContainerId</code> does not necessarily contain the current
+ * running application attempt's <code>ApplicationAttemptId</code> This
+ * container can be allocated by previously exited application attempt and
+ * managed by the current running attempt thus have the previous application
+ * attempt's <code>ApplicationAttemptId</code>.
+ * </p>
+ *
+ * @return <code>ApplicationAttemptId</code> of the application to which the
+ * <code>Container</code> was assigned
*/
@Public
@Stable
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto Fri Jan 10 15:05:18 2014
@@ -248,6 +248,7 @@ message ApplicationSubmissionContextProt
optional int32 maxAppAttempts = 8 [default = 0];
optional ResourceProto resource = 9;
optional string applicationType = 10 [default = "YARN"];
+ optional bool keep_containers_across_application_attempts = 11 [default = false];
}
enum ApplicationAccessTypeProto {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java Fri Jan 10 15:05:18 2014
@@ -298,6 +298,19 @@ extends ApplicationSubmissionContext {
this.resource = resource;
}
+ @Override
+ public void
+ setKeepContainersAcrossApplicationAttempts(boolean keepContainers) {
+ maybeInitBuilder();
+ builder.setKeepContainersAcrossApplicationAttempts(keepContainers);
+ }
+
+ @Override
+ public boolean getKeepContainersAcrossApplicationAttempts() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getKeepContainersAcrossApplicationAttempts();
+ }
+
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Fri Jan 10 15:05:18 2014
@@ -421,21 +421,26 @@ public class ApplicationMasterService ex
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
-
- try {
- RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
- } catch (InvalidContainerReleaseException e) {
- LOG.warn("Invalid container release by application " + appAttemptId, e);
- throw e;
+
+ RMApp app =
+ this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
+ // In the case of work-preserving AM restart, it's possible for the
+ // AM to release containers from the earlier attempt.
+ if (!app.getApplicationSubmissionContext()
+ .getKeepContainersAcrossApplicationAttempts()) {
+ try {
+ RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
+ } catch (InvalidContainerReleaseException e) {
+ LOG.warn("Invalid container release by application " + appAttemptId, e);
+ throw e;
+ }
}
-
+
// Send new requests to appAttempt.
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals);
- RMApp app = this.rmContext.getRMApps().get(
- appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =
@@ -591,4 +596,4 @@ public class ApplicationMasterService ex
this.response = response;
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java Fri Jan 10 15:05:18 2014
@@ -23,14 +23,20 @@ import org.apache.hadoop.yarn.api.record
public class RMAppFailedAttemptEvent extends RMAppEvent {
private final String diagnostics;
+ private final boolean transferStateFromPreviousAttempt;
public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event,
- String diagnostics) {
+ String diagnostics, boolean transferStateFromPreviousAttempt) {
super(appId, event);
this.diagnostics = diagnostics;
+ this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
}
public String getDiagnostics() {
return this.diagnostics;
}
+
+ public boolean getTransferStateFromPreviousAttempt() {
+ return transferStateFromPreviousAttempt;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Jan 10 15:05:18 2014
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.resource.Resources;
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class RMAppImpl implements RMApp, Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
@@ -633,7 +635,7 @@ public class RMAppImpl implements RMApp,
this.writeLock.unlock();
}
}
-
+
@Override
public void recover(RMState state) throws Exception{
ApplicationState appState = state.getApplicationState().get(getApplicationId());
@@ -646,26 +648,28 @@ public class RMAppImpl implements RMApp,
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
- createNewAttempt(false);
+ createNewAttempt();
((RMAppAttemptImpl)this.currentAttempt).recover(state);
}
}
- @SuppressWarnings("unchecked")
- private void createNewAttempt(boolean startAttempt) {
+ private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
- submissionContext, conf);
+ submissionContext, conf, maxAppAttempts == attempts.size());
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
- if(startAttempt) {
- handler.handle(
- new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
- }
}
+ private void
+ createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
+ createNewAttempt();
+ handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
+ transferStateFromPreviousAttempt));
+ }
+
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
NodeState nodeState = node.getState();
updatedNodes.add(node);
@@ -688,7 +692,6 @@ public class RMAppImpl implements RMApp,
};
}
- @SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@@ -729,7 +732,6 @@ public class RMAppImpl implements RMApp,
private static final class AddApplicationToSchedulerTransition extends
RMAppTransition {
- @SuppressWarnings("unchecked")
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
if (event instanceof RMAppNewSavedEvent) {
@@ -751,14 +753,13 @@ public class RMAppImpl implements RMApp,
private static final class StartAppAttemptTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- app.createNewAttempt(true);
+ app.createAndStartNewAttempt(false);
};
}
private static final class FinalStateSavedTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
@@ -959,7 +960,6 @@ public class RMAppImpl implements RMApp,
}
private static class KillAttemptTransition extends RMAppTransition {
- @SuppressWarnings("unchecked")
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.stateBeforeKilling = app.getState();
@@ -987,7 +987,6 @@ public class RMAppImpl implements RMApp,
return nodes;
}
- @SuppressWarnings("unchecked")
public void transition(RMAppImpl app, RMAppEvent event) {
Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
for (NodeId nodeId : nodes) {
@@ -1019,7 +1018,21 @@ public class RMAppImpl implements RMApp,
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (!app.submissionContext.getUnmanagedAM()
&& app.attempts.size() < app.maxAppAttempts) {
- app.createNewAttempt(true);
+ boolean transferStateFromPreviousAttempt = false;
+ RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
+ transferStateFromPreviousAttempt =
+ failedEvent.getTransferStateFromPreviousAttempt();
+
+ RMAppAttempt oldAttempt = app.currentAttempt;
+ app.createAndStartNewAttempt(transferStateFromPreviousAttempt);
+ // Transfer the state from the previous attempt to the current attempt.
+ // Note that the previous failed attempt may still be collecting the
+ // container events from the scheduler and update its data structures
+ // before the new attempt is created.
+ if (transferStateFromPreviousAttempt) {
+ ((RMAppAttemptImpl) app.currentAttempt)
+ .transferStateFromPreviousAttempt(oldAttempt);
+ }
return initialState;
} else {
app.rememberTargetTransitionsAndStoreState(event,
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Jan 10 15:05:18 2014
@@ -129,9 +129,9 @@ public class RMAppAttemptImpl implements
private SecretKey clientTokenMasterKey = null;
//nodes on while this attempt's containers ran
- private final Set<NodeId> ranNodes =
+ private Set<NodeId> ranNodes =
new HashSet<NodeId>();
- private final List<ContainerStatus> justFinishedContainers =
+ private List<ContainerStatus> justFinishedContainers =
new ArrayList<ContainerStatus>();
private Container masterContainer;
@@ -148,7 +148,7 @@ public class RMAppAttemptImpl implements
private final StringBuilder diagnostics = new StringBuilder();
private Configuration conf;
-
+ private final boolean isLastAttempt;
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
@@ -330,6 +330,12 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.KILL))
// Transitions from FAILED State
+ // For work-preserving AM restart, failed attempt are still capturing
+ // CONTAINER_FINISHED event and record the finished containers for the
+ // use by the next new attempt.
+ .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
+ RMAppAttemptEventType.CONTAINER_FINISHED,
+ new ContainerFinishedAtFailedTransition())
.addTransition(
RMAppAttemptState.FAILED,
RMAppAttemptState.FAILED,
@@ -338,8 +344,7 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.STATUS_UPDATE,
- RMAppAttemptEventType.CONTAINER_ALLOCATED,
- RMAppAttemptEventType.CONTAINER_FINISHED))
+ RMAppAttemptEventType.CONTAINER_ALLOCATED))
// Transitions from FINISHING State
.addTransition(RMAppAttemptState.FINISHING,
@@ -390,7 +395,7 @@ public class RMAppAttemptImpl implements
RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext,
- Configuration conf) {
+ Configuration conf, boolean isLastAttempt) {
this.conf = conf;
this.applicationAttemptId = appAttemptId;
this.rmContext = rmContext;
@@ -404,7 +409,7 @@ public class RMAppAttemptImpl implements
this.writeLock = lock.writeLock();
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
-
+ this.isLastAttempt = isLastAttempt;
this.stateMachine = stateMachineFactory.make(this);
}
@@ -416,7 +421,7 @@ public class RMAppAttemptImpl implements
@Override
public ApplicationSubmissionContext getSubmissionContext() {
return this.submissionContext;
- }
+ }
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
@@ -685,6 +690,11 @@ public class RMAppAttemptImpl implements
this.startTime = attemptState.getStartTime();
}
+ public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
+ this.justFinishedContainers = attempt.getJustFinishedContainers();
+ this.ranNodes = attempt.getRanNodes();
+ }
+
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
throws IOException {
if (appAttemptTokens == null) {
@@ -721,6 +731,12 @@ public class RMAppAttemptImpl implements
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
+ boolean transferStateFromPreviousAttempt = false;
+ if (event instanceof RMAppStartAttemptEvent) {
+ transferStateFromPreviousAttempt =
+ ((RMAppStartAttemptEvent) event)
+ .getTransferStateFromPreviousAttempt();
+ }
appAttempt.startTime = System.currentTimeMillis();
// Register with the ApplicationMasterService
@@ -740,9 +756,10 @@ public class RMAppAttemptImpl implements
new Token<AMRMTokenIdentifier>(id,
appAttempt.rmContext.getAMRMTokenSecretManager());
- // Add the applicationAttempt to the scheduler
+ // Add the applicationAttempt to the scheduler and inform the scheduler
+ // whether to transfer the state from previous attempt.
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
- appAttempt.applicationAttemptId));
+ appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
}
}
@@ -981,6 +998,7 @@ public class RMAppAttemptImpl implements
// Tell the application and the scheduler
ApplicationId applicationId = appAttemptId.getApplicationId();
RMAppEvent appEvent = null;
+ boolean keepContainersAcrossAppAttempts = false;
switch (finalAttemptState) {
case FINISHED:
{
@@ -996,7 +1014,7 @@ public class RMAppAttemptImpl implements
appEvent =
new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_KILLED,
- "Application killed by user.");
+ "Application killed by user.", false);
}
break;
case FAILED:
@@ -1004,10 +1022,17 @@ public class RMAppAttemptImpl implements
// don't leave the tracking URL pointing to a non-existent AM
appAttempt.setTrackingUrlToRMAppPage();
appAttempt.invalidateAMHostAndPort();
+ if (appAttempt.submissionContext
+ .getKeepContainersAcrossApplicationAttempts()
+ && !appAttempt.isLastAttempt
+ && !appAttempt.submissionContext.getUnmanagedAM()) {
+ keepContainersAcrossAppAttempts = true;
+ }
appEvent =
new RMAppFailedAttemptEvent(applicationId,
- RMAppEventType.ATTEMPT_FAILED,
- appAttempt.getDiagnostics());
+ RMAppEventType.ATTEMPT_FAILED, appAttempt.getDiagnostics(),
+ keepContainersAcrossAppAttempts);
+
}
break;
default:
@@ -1019,7 +1044,7 @@ public class RMAppAttemptImpl implements
appAttempt.eventHandler.handle(appEvent);
appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
- appAttemptId, finalAttemptState));
+ appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts));
appAttempt.removeCredentials(appAttempt);
}
}
@@ -1045,6 +1070,11 @@ public class RMAppAttemptImpl implements
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
appAttempt.checkAttemptStoreError(event);
+ // TODO Today unmanaged AM client is waiting for app state to be Accepted to
+ // launch the AM. This is broken since we changed to start the attempt
+ // after the application is Accepted. We may need to introduce an attempt
+ // report that client can rely on to query the attempt state and choose to
+ // launch the unmanaged AM.
super.transition(appAttempt, event);
}
}
@@ -1346,6 +1376,20 @@ public class RMAppAttemptImpl implements
}
}
+ private static final class ContainerFinishedAtFailedTransition
+ extends BaseTransition {
+ @Override
+ public void
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+ RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+ (RMAppAttemptContainerFinishedEvent) event;
+ ContainerStatus containerStatus =
+ containerFinishedEvent.getContainerStatus();
+ // Normal container. Add it in completed containers list
+ appAttempt.justFinishedContainers.add(containerStatus);
+ }
+ }
+
private static class ContainerFinishedFinalStateSavedTransition extends
BaseTransition {
@Override
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Jan 10 15:05:18 2014
@@ -59,10 +59,10 @@ public class AppSchedulingInfo {
final Set<Priority> priorities = new TreeSet<Priority>(
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
- final Map<Priority, Map<String, ResourceRequest>> requests =
+ final Map<Priority, Map<String, ResourceRequest>> requests =
new HashMap<Priority, Map<String, ResourceRequest>>();
- final Set<String> blacklist = new HashSet<String>();
-
+ private Set<String> blacklist = new HashSet<String>();
+
//private final ApplicationStore store;
private final ActiveUsersManager activeUsersManager;
@@ -399,4 +399,15 @@ public class AppSchedulingInfo {
public synchronized void setQueue(Queue queue) {
this.queue = queue;
}
+
+ public synchronized Set<String> getBlackList() {
+ return this.blacklist;
+ }
+
+ public synchronized void transferStateFromPreviousAppSchedulingInfo(
+ AppSchedulingInfo appInfo) {
+ // this.priorities = appInfo.getPriorities();
+ // this.requests = appInfo.getRequests();
+ this.blacklist = appInfo.getBlackList();
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Fri Jan 10 15:05:18 2014
@@ -26,6 +26,7 @@ public class SchedulerApplication {
private final Queue queue;
private final String user;
+ private SchedulerApplicationAttempt currentAttempt;
public SchedulerApplication(Queue queue, String user) {
this.queue = queue;
@@ -39,4 +40,12 @@ public class SchedulerApplication {
public String getUser() {
return user;
}
+
+ public SchedulerApplicationAttempt getCurrentAppAttempt() {
+ return currentAttempt;
+ }
+
+ public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
+ this.currentAttempt = currentAttempt;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Fri Jan 10 15:05:18 2014
@@ -64,7 +64,7 @@ public abstract class SchedulerApplicati
protected final AppSchedulingInfo appSchedulingInfo;
- protected final Map<ContainerId, RMContainer> liveContainers =
+ protected Map<ContainerId, RMContainer> liveContainers =
new HashMap<ContainerId, RMContainer>();
protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
@@ -73,7 +73,7 @@ public abstract class SchedulerApplicati
protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
- protected final Resource currentConsumption = Resource.newInstance(0, 0);
+ protected Resource currentConsumption = Resource.newInstance(0, 0);
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
@@ -407,4 +407,29 @@ public abstract class SchedulerApplicati
Resources.add(currentConsumption, currentReservation));
}
+ public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
+ return this.liveContainers;
+ }
+
+ public synchronized Resource getResourceLimit() {
+ return this.resourceLimit;
+ }
+
+ public synchronized Map<Priority, Long> getLastScheduledContainer() {
+ return this.lastScheduledContainer;
+ }
+
+ public synchronized void transferStateFromPreviousAttempt(
+ SchedulerApplicationAttempt appAttempt) {
+ this.liveContainers = appAttempt.getLiveContainersMap();
+ // this.reReservations = appAttempt.reReservations;
+ this.currentConsumption = appAttempt.getCurrentConsumption();
+ this.resourceLimit = appAttempt.getResourceLimit();
+ // this.currentReservation = appAttempt.currentReservation;
+ // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
+ // this.schedulingOpportunities = appAttempt.schedulingOpportunities;
+ this.lastScheduledContainer = appAttempt.getLastScheduledContainer();
+ this.appSchedulingInfo
+ .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Fri Jan 10 15:05:18 2014
@@ -19,13 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
/**
@@ -170,4 +171,13 @@ public interface YarnScheduler extends E
@LimitedPrivate("yarn")
@Stable
public List<ApplicationAttemptId> getAppsInQueue(String queueName);
+
+ /**
+ * Get the container for the given containerId.
+ * @param containerId
+ * @return the container for the given containerId.
+ */
+ @LimitedPrivate("yarn")
+ @Unstable
+ public RMContainer getRMContainer(ContainerId containerId);
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Jan 10 15:05:18 2014
@@ -63,14 +63,15 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -194,10 +195,6 @@ public class CapacityScheduler
protected Map<ApplicationId, SchedulerApplication> applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
- @VisibleForTesting
- protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts =
- new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
-
private boolean initialized = false;
private ResourceCalculator calculator;
@@ -464,21 +461,27 @@ public class CapacityScheduler
}
private synchronized void addApplicationAttempt(
- ApplicationAttemptId applicationAttemptId) {
+ ApplicationAttemptId applicationAttemptId,
+ boolean transferStateFromPreviousAttempt) {
SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue();
- FiCaSchedulerApp SchedulerApp =
+ FiCaSchedulerApp attempt =
new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
queue, queue.getActiveUsersManager(), rmContext);
- appAttempts.put(applicationAttemptId, SchedulerApp);
- queue.submitApplicationAttempt(SchedulerApp, application.getUser());
+ if (transferStateFromPreviousAttempt) {
+ attempt.transferStateFromPreviousAttempt(application
+ .getCurrentAppAttempt());
+ }
+ application.setCurrentAppAttempt(attempt);
+
+ queue.submitApplicationAttempt(attempt, application.getUser());
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
+ queue.getQueueName());
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
+ rmContext.getDispatcher().getEventHandler() .handle(
+ new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
@@ -486,7 +489,8 @@ public class CapacityScheduler
RMAppState finalState) {
SchedulerApplication application = applications.get(applicationId);
if (application == null){
- // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps.
+ // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
+ // ignore it.
return;
}
CSQueue queue = (CSQueue) application.getQueue();
@@ -501,52 +505,56 @@ public class CapacityScheduler
private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState rmAppAttemptFinalState) {
+ RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application Attempt " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
+ SchedulerApplication application =
+ applications.get(applicationAttemptId.getApplicationId());
- if (application == null) {
- // throw new IOException("Unknown application " + applicationId +
- // " has completed!");
+ if (application == null || attempt == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
return;
}
-
- // Release all the running containers
- for (RMContainer rmContainer : application.getLiveContainers()) {
- completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(),
- SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
+
+ // Release all the allocated, acquired, running containers
+ for (RMContainer rmContainer : attempt.getLiveContainers()) {
+ if (keepContainers
+ && rmContainer.getState().equals(RMContainerState.RUNNING)) {
+ // do not kill the running container in the case of work-preserving AM
+ // restart.
+ LOG.info("Skip killing " + rmContainer.getContainerId());
+ continue;
+ }
+ completedContainer(
+ rmContainer,
+ SchedulerUtils.createAbnormalContainerStatus(
+ rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
+ RMContainerEventType.KILL);
}
-
- // Release all reserved containers
- for (RMContainer rmContainer : application.getReservedContainers()) {
- completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(),
- "Application Complete"),
- RMContainerEventType.KILL);
+
+ // Release all reserved containers
+ for (RMContainer rmContainer : attempt.getReservedContainers()) {
+ completedContainer(
+ rmContainer,
+ SchedulerUtils.createAbnormalContainerStatus(
+ rmContainer.getContainerId(), "Application Complete"),
+ RMContainerEventType.KILL);
}
-
+
// Clean up pending requests, metrics etc.
- application.stop(rmAppAttemptFinalState);
-
+ attempt.stop(rmAppAttemptFinalState);
+
// Inform the queue
- String queueName = application.getQueue().getQueueName();
+ String queueName = attempt.getQueue().getQueueName();
CSQueue queue = queues.get(queueName);
if (!(queue instanceof LeafQueue)) {
LOG.error("Cannot finish application " + "from non-leaf queue: "
+ queueName);
} else {
- queue.finishApplicationAttempt(application, queue.getQueueName());
+ queue.finishApplicationAttempt(attempt, queue.getQueueName());
}
-
- // Remove from our data-structure
- appAttempts.remove(applicationAttemptId);
}
private static final Allocation EMPTY_ALLOCATION =
@@ -558,7 +566,7 @@ public class CapacityScheduler
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@@ -700,8 +708,8 @@ public class CapacityScheduler
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
- FiCaSchedulerApp reservedApplication =
- getApplication(reservedContainer.getApplicationAttemptId());
+ FiCaSchedulerApp reservedApplication =
+ getCurrentAttemptForContainer(reservedContainer.getContainerId());
// Try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " +
@@ -738,12 +746,11 @@ public class CapacityScheduler
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
- ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
- LOG.info("Unknown application: " + applicationAttemptId +
- " launched container " + containerId +
- " on node: " + node);
+ LOG.info("Unknown application "
+ + containerId.getApplicationAttemptId().getApplicationId()
+ + " launched container " + containerId + " on node: " + node);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
@@ -791,7 +798,8 @@ public class CapacityScheduler
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
- addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
}
break;
case APP_ATTEMPT_REMOVED:
@@ -799,7 +807,8 @@ public class CapacityScheduler
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
(AppAttemptRemovedSchedulerEvent) event;
doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
- appAttemptRemovedEvent.getFinalAttemptState());
+ appAttemptRemovedEvent.getFinalAttemptState(),
+ appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
}
break;
case CONTAINER_EXPIRED:
@@ -874,13 +883,13 @@ public class CapacityScheduler
Container container = rmContainer.getContainer();
// Get the application for the finished container
- ApplicationAttemptId applicationAttemptId =
- container.getId().getApplicationAttemptId();
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application =
+ getCurrentAttemptForContainer(container.getId());
+ ApplicationId appId =
+ container.getId().getApplicationAttemptId().getApplicationId();
if (application == null) {
- LOG.info("Container " + container + " of" +
- " unknown application " + applicationAttemptId +
- " completed with event " + event);
+ LOG.info("Container " + container + " of" + " unknown application "
+ + appId + " completed with event " + event);
return;
}
@@ -892,28 +901,33 @@ public class CapacityScheduler
queue.completedContainer(clusterResource, application, node,
rmContainer, containerStatus, event, null);
- LOG.info("Application " + applicationAttemptId +
- " released container " + container.getId() +
- " on node: " + node +
- " with event: " + event);
+ LOG.info("Application attempt " + application.getApplicationAttemptId()
+ + " released container " + container.getId() + " on node: " + node
+ + " with event: " + event);
}
@Lock(Lock.NoLock.class)
- FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
- return appAttempts.get(applicationAttemptId);
+ FiCaSchedulerApp getApplicationAttempt(
+ ApplicationAttemptId applicationAttemptId) {
+ SchedulerApplication app =
+ applications.get(applicationAttemptId.getApplicationId());
+ if (app != null) {
+ return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplication(applicationAttemptId);
+ FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplication(applicationAttemptId);
+ FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
@@ -922,10 +936,22 @@ public class CapacityScheduler
return nodes.get(nodeId);
}
- private RMContainer getRMContainer(ContainerId containerId) {
- FiCaSchedulerApp application =
- getApplication(containerId.getApplicationAttemptId());
- return (application == null) ? null : application.getRMContainer(containerId);
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
+ FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+ return (attempt == null) ? null : attempt.getRMContainer(containerId);
+ }
+
+ @VisibleForTesting
+ public FiCaSchedulerApp getCurrentAttemptForContainer(
+ ContainerId containerId) {
+ SchedulerApplication app =
+ applications.get(containerId.getApplicationAttemptId()
+ .getApplicationId());
+ if (app != null) {
+ return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
@Override
@@ -958,7 +984,7 @@ public class CapacityScheduler
LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
" container: " + cont.toString());
}
- FiCaSchedulerApp app = appAttempts.get(aid);
+ FiCaSchedulerApp app = getApplicationAttempt(aid);
if (app != null) {
app.addPreemptContainer(cont.getContainerId());
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Fri Jan 10 15:05:18 2014
@@ -219,7 +219,8 @@ public class FiCaSchedulerNode extends S
" on node " + this.reservedContainer.getReservedNode());
}
- // Cannot reserve more than one application on a given node!
+ // Cannot reserve more than one application attempt on a given node!
+ // Reservation is still against attempt.
if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
reservedContainer.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Fri Jan 10 15:05:18 2014
@@ -23,14 +23,21 @@ import org.apache.hadoop.yarn.api.record
public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
private final ApplicationAttemptId applicationAttemptId;
+ private final boolean transferStateFromPreviousAttempt;
public AppAttemptAddedSchedulerEvent(
- ApplicationAttemptId applicationAttemptId) {
+ ApplicationAttemptId applicationAttemptId,
+ boolean transferStateFromPreviousAttempt) {
super(SchedulerEventType.APP_ATTEMPT_ADDED);
this.applicationAttemptId = applicationAttemptId;
+ this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
}
public ApplicationAttemptId getApplicationAttemptId() {
return applicationAttemptId;
}
+
+ public boolean getTransferStateFromPreviousAttempt() {
+ return transferStateFromPreviousAttempt;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java Fri Jan 10 15:05:18 2014
@@ -25,13 +25,15 @@ public class AppAttemptRemovedSchedulerE
private final ApplicationAttemptId applicationAttemptId;
private final RMAppAttemptState finalAttemptState;
+ private final boolean keepContainersAcrossAppAttempts;
public AppAttemptRemovedSchedulerEvent(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState finalAttemptState) {
+ RMAppAttemptState finalAttemptState, boolean keepContainers) {
super(SchedulerEventType.APP_ATTEMPT_REMOVED);
this.applicationAttemptId = applicationAttemptId;
this.finalAttemptState = finalAttemptState;
+ this.keepContainersAcrossAppAttempts = keepContainers;
}
public ApplicationAttemptId getApplicationAttemptID() {
@@ -41,4 +43,8 @@ public class AppAttemptRemovedSchedulerE
public RMAppAttemptState getFinalAttemptState() {
return this.finalAttemptState;
}
+
+ public boolean getKeepContainersAcrossAppAttempts() {
+ return this.keepContainersAcrossAppAttempts;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jan 10 15:05:18 2014
@@ -162,12 +162,6 @@ public class FairScheduler implements Re
protected Map<ApplicationId, SchedulerApplication> applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
- // This stores per-application-attempt scheduling information, indexed by
- // attempt ID's for fast lookup.
- @VisibleForTesting
- protected Map<ApplicationAttemptId, FSSchedulerApp> appAttempts =
- new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
-
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
@@ -262,10 +256,21 @@ public class FairScheduler implements Re
return queueMgr;
}
- private RMContainer getRMContainer(ContainerId containerId) {
- FSSchedulerApp application =
- appAttempts.get(containerId.getApplicationAttemptId());
- return (application == null) ? null : application.getRMContainer(containerId);
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
+ FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+ return (attempt == null) ? null : attempt.getRMContainer(containerId);
+ }
+
+ private FSSchedulerApp getCurrentAttemptForContainer(
+ ContainerId containerId) {
+ SchedulerApplication app =
+ applications.get(containerId.getApplicationAttemptId()
+ .getApplicationId());
+ if (app != null) {
+ return (FSSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
/**
@@ -640,7 +645,8 @@ public class FairScheduler implements Re
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queueName);
+ + ", in queue: " + queueName + ", currently num of applications: "
+ + applications.size());
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
@@ -649,31 +655,35 @@ public class FairScheduler implements Re
* Add a new application attempt to the scheduler.
*/
protected synchronized void addApplicationAttempt(
- ApplicationAttemptId applicationAttemptId) {
+ ApplicationAttemptId applicationAttemptId,
+ boolean transferStateFromPreviousAttempt) {
SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
- FSSchedulerApp schedulerApp =
+ FSSchedulerApp attempt =
new FSSchedulerApp(applicationAttemptId, user,
queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext);
+ if (transferStateFromPreviousAttempt) {
+ attempt.transferStateFromPreviousAttempt(application
+ .getCurrentAppAttempt());
+ }
+ application.setCurrentAppAttempt(attempt);
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
- queue.addApp(schedulerApp, runnable);
+ queue.addApp(attempt, runnable);
if (runnable) {
- maxRunningEnforcer.trackRunnableApp(schedulerApp);
+ maxRunningEnforcer.trackRunnableApp(attempt);
} else {
- maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
+ maxRunningEnforcer.trackNonRunnableApp(attempt);
}
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
- appAttempts.put(applicationAttemptId, schedulerApp);
LOG.info("Added Application Attempt " + applicationAttemptId
- + " to scheduler from user: " + user + ", currently active: "
- + appAttempts.size());
+ + " to scheduler from user: " + user);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
@@ -709,19 +719,27 @@ public class FairScheduler implements Re
private synchronized void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState rmAppAttemptFinalState) {
+ RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
+ SchedulerApplication application =
+ applications.get(applicationAttemptId.getApplicationId());
+ FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
- FSSchedulerApp application = appAttempts.get(applicationAttemptId);
-
- if (application == null) {
+ if (attempt == null || application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
return;
}
// Release all the running containers
- for (RMContainer rmContainer : application.getLiveContainers()) {
+ for (RMContainer rmContainer : attempt.getLiveContainers()) {
+ if (keepContainers
+ && rmContainer.getState().equals(RMContainerState.RUNNING)) {
+ // do not kill the running container in the case of work-preserving AM
+ // restart.
+ LOG.info("Skip killing " + rmContainer.getContainerId());
+ continue;
+ }
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
@@ -730,30 +748,26 @@ public class FairScheduler implements Re
}
// Release all reserved containers
- for (RMContainer rmContainer : application.getReservedContainers()) {
+ for (RMContainer rmContainer : attempt.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
"Application Complete"),
- RMContainerEventType.KILL);
+ RMContainerEventType.KILL);
}
-
// Clean up pending requests, metrics etc.
- application.stop(rmAppAttemptFinalState);
+ attempt.stop(rmAppAttemptFinalState);
// Inform the queue
- FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
+ FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
.getQueueName(), false);
- boolean wasRunnable = queue.removeApp(application);
+ boolean wasRunnable = queue.removeApp(attempt);
if (wasRunnable) {
- maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
} else {
- maxRunningEnforcer.untrackNonRunnableApp(application);
+ maxRunningEnforcer.untrackNonRunnableApp(attempt);
}
-
- // Remove from our data-structure
- appAttempts.remove(applicationAttemptId);
}
/**
@@ -769,11 +783,13 @@ public class FairScheduler implements Re
Container container = rmContainer.getContainer();
// Get the application for the finished container
- ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
- FSSchedulerApp application = appAttempts.get(applicationAttemptId);
+ FSSchedulerApp application =
+ getCurrentAttemptForContainer(container.getId());
+ ApplicationId appId =
+ container.getId().getApplicationAttemptId().getApplicationId();
if (application == null) {
LOG.info("Container " + container + " of" +
- " unknown application " + applicationAttemptId +
+ " unknown application attempt " + appId +
" completed with event " + event);
return;
}
@@ -790,10 +806,9 @@ public class FairScheduler implements Re
updateRootQueueMetrics();
}
- LOG.info("Application " + applicationAttemptId +
- " released container " + container.getId() +
- " on node: " + node +
- " with event: " + event);
+ LOG.info("Application attempt " + application.getApplicationAttemptId()
+ + " released container " + container.getId() + " on node: " + node
+ + " with event: " + event);
}
private synchronized void addNode(RMNode node) {
@@ -844,7 +859,7 @@ public class FairScheduler implements Re
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Make sure this application exists
- FSSchedulerApp application = appAttempts.get(appAttemptId);
+ FSSchedulerApp application = getSchedulerApp(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@@ -914,12 +929,11 @@ public class FairScheduler implements Re
*/
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
- ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- FSSchedulerApp application = appAttempts.get(applicationAttemptId);
+ FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
- LOG.info("Unknown application: " + applicationAttemptId +
- " launched container " + containerId +
- " on node: " + node);
+ LOG.info("Unknown application "
+ + containerId.getApplicationAttemptId().getApplicationId()
+ + " launched container " + containerId + " on node: " + node);
return;
}
@@ -1058,28 +1072,34 @@ public class FairScheduler implements Re
}
public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
- return appAttempts.get(appAttemptId);
+ SchedulerApplication app =
+ applications.get(appAttemptId.getApplicationId());
+ if (app != null) {
+ return (FSSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
- if (!appAttempts.containsKey(appAttemptId)) {
+ FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+ if (attempt == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
- return new SchedulerAppReport(appAttempts.get(appAttemptId));
+ return new SchedulerAppReport(attempt);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
- FSSchedulerApp app = appAttempts.get(appAttemptId);
- if (app == null) {
+ FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+ if (attempt == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
- return app.getResourceUsageReport();
+ return attempt.getResourceUsageReport();
}
/**
@@ -1145,7 +1165,8 @@ public class FairScheduler implements Re
}
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
- addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
break;
case APP_ATTEMPT_REMOVED:
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1153,8 +1174,10 @@ public class FairScheduler implements Re
}
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
(AppAttemptRemovedSchedulerEvent) event;
- removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
- appAttemptRemovedEvent.getFinalAttemptState());
+ removeApplicationAttempt(
+ appAttemptRemovedEvent.getApplicationAttemptID(),
+ appAttemptRemovedEvent.getFinalAttemptState(),
+ appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
break;
case CONTAINER_EXPIRED:
if (!(event instanceof ContainerExpiredSchedulerEvent)) {