You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cl...@apache.org on 2014/06/16 20:14:12 UTC
svn commit: r1602947 [3/5] - in
/hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/ha...
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java Mon Jun 16 18:13:57 2014
@@ -20,21 +20,15 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-public class ApplicationStateDataPBImpl
-extends ProtoBase<ApplicationStateDataProto>
-implements ApplicationStateData {
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
+public class ApplicationStateDataPBImpl extends ApplicationStateData {
ApplicationStateDataProto proto =
ApplicationStateDataProto.getDefaultInstance();
ApplicationStateDataProto.Builder builder = null;
@@ -51,7 +45,8 @@ implements ApplicationStateData {
this.proto = proto;
viaProto = true;
}
-
+
+ @Override
public ApplicationStateDataProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
@@ -136,7 +131,7 @@ implements ApplicationStateData {
}
applicationSubmissionContext =
new ApplicationSubmissionContextPBImpl(
- p.getApplicationSubmissionContext());
+ p.getApplicationSubmissionContext());
return applicationSubmissionContext;
}
@@ -200,21 +195,24 @@ implements ApplicationStateData {
builder.setFinishTime(finishTime);
}
- public static ApplicationStateData newApplicationStateData(long submitTime,
- long startTime, String user,
- ApplicationSubmissionContext submissionContext, RMAppState state,
- String diagnostics, long finishTime) {
-
- ApplicationStateData appState =
- recordFactory.newRecordInstance(ApplicationStateData.class);
- appState.setSubmitTime(submitTime);
- appState.setStartTime(startTime);
- appState.setUser(user);
- appState.setApplicationSubmissionContext(submissionContext);
- appState.setState(state);
- appState.setDiagnostics(diagnostics);
- appState.setFinishTime(finishTime);
- return appState;
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
}
private static String RM_APP_PREFIX = "RMAPP_";
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Jun 16 18:13:57 2014
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -293,11 +294,6 @@ public class RMAppImpl implements RMApp,
private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
stateMachine;
- private static final ApplicationResourceUsageReport
- DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
- BuilderUtils.newApplicationResourceUsageReport(-1, -1,
- Resources.createResource(-1, -1), Resources.createResource(-1, -1),
- Resources.createResource(-1, -1));
private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
@@ -498,7 +494,7 @@ public class RMAppImpl implements RMApp,
String origTrackingUrl = UNAVAILABLE;
int rpcPort = -1;
ApplicationResourceUsageReport appUsageReport =
- DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
+ RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
FinalApplicationStatus finishState = getFinalApplicationStatus();
String diags = UNAVAILABLE;
float progress = 0.0f;
@@ -723,29 +719,36 @@ public class RMAppImpl implements RMApp,
}
}
+ // synchronously recover attempt to ensure any incoming external events
+ // to be processed after the attempt processes the recover event.
+ private void recoverAppAttempts() {
+ for (RMAppAttempt attempt : getAppAttempts().values()) {
+ attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
+ }
+ }
+
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
- for (RMAppAttempt attempt : app.getAppAttempts().values()) {
- // synchronously recover attempt to ensure any incoming external events
- // to be processed after the attempt processes the recover event.
- attempt.handle(
- new RMAppAttemptEvent(attempt.getAppAttemptId(),
- RMAppAttemptEventType.RECOVER));
- }
-
// The app has completed.
if (app.recoveredFinalState != null) {
+ app.recoverAppAttempts();
new FinalTransition(app.recoveredFinalState).transition(app, event);
return app.recoveredFinalState;
}
- // Last attempt is in final state, do not add to scheduler and just return
- // ACCEPTED waiting for last RMAppAttempt to send finished or failed event
- // back.
+ // Notify scheduler about the app on recovery
+ new AddApplicationToSchedulerTransition().transition(app, event);
+
+ // recover attempts
+ app.recoverAppAttempts();
+
+ // Last attempt is in final state, return ACCEPTED waiting for last
+ // RMAppAttempt to send finished or failed event back.
if (app.currentAttempt != null
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
@@ -754,9 +757,6 @@ public class RMAppImpl implements RMApp,
return RMAppState.ACCEPTED;
}
- // Notify scheduler about the app on recovery
- new AddApplicationToSchedulerTransition().transition(app, event);
-
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
@@ -1055,8 +1055,12 @@ public class RMAppImpl implements RMApp,
if (app.finishTime == 0 ) {
app.finishTime = System.currentTimeMillis();
}
- app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
- finalState));
+ // Recovered apps that are completed were not added to scheduler, so no
+ // need to remove them from scheduler.
+ if (app.recoveredFinalState == null) {
+ app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
+ finalState));
+ }
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Jun 16 18:13:57 2014
@@ -267,15 +267,17 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
new FinalSavingTransition(
- new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+ new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED))
// Transitions from LAUNCHED State
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
- .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
+ .addTransition(RMAppAttemptState.LAUNCHED,
+ EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
- new FinalSavingTransition(
- new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+ new ContainerFinishedTransition(
+ new AMContainerCrashedBeforeRunningTransition(),
+ RMAppAttemptState.LAUNCHED))
.addTransition(
RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.EXPIRE,
@@ -302,7 +304,9 @@ public class RMAppAttemptImpl implements
RMAppAttemptState.RUNNING,
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
- new ContainerFinishedTransition())
+ new ContainerFinishedTransition(
+ new AMContainerCrashedAtRunningTransition(),
+ RMAppAttemptState.RUNNING))
.addTransition(
RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.EXPIRE,
@@ -671,9 +675,7 @@ public class RMAppAttemptImpl implements
ApplicationResourceUsageReport report =
scheduler.getAppResourceUsageReport(this.getAppAttemptId());
if (report == null) {
- Resource none = Resource.newInstance(0, 0);
- report = ApplicationResourceUsageReport.newInstance(0, 0, none, none,
- none);
+ report = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
}
return report;
} finally {
@@ -904,6 +906,12 @@ public class RMAppAttemptImpl implements
}
return appAttempt.recoveredFinalState;
} else {
+ // Add the current attempt to the scheduler.
+ if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
+ appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
+ appAttempt.getAppAttemptId(), false));
+ }
+
/*
* Since the application attempt's final state is not saved that means
* for AM container (previous attempt) state must be one of these.
@@ -1207,17 +1215,16 @@ public class RMAppAttemptImpl implements
}
}
- private static final class AMContainerCrashedTransition extends
+ private static final class AMContainerCrashedBeforeRunningTransition extends
BaseFinalTransition {
- public AMContainerCrashedTransition() {
+ public AMContainerCrashedBeforeRunningTransition() {
super(RMAppAttemptState.FAILED);
}
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
-
RMAppAttemptContainerFinishedEvent finishEvent =
((RMAppAttemptContainerFinishedEvent)event);
@@ -1410,6 +1417,16 @@ public class RMAppAttemptImpl implements
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+ // The transition To Do after attempt final state is saved.
+ private BaseTransition transitionToDo;
+ private RMAppAttemptState currentState;
+
+ public ContainerFinishedTransition(BaseTransition transitionToDo,
+ RMAppAttemptState currentState) {
+ this.transitionToDo = transitionToDo;
+ this.currentState = currentState;
+ }
+
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
@@ -1426,14 +1443,13 @@ public class RMAppAttemptImpl implements
containerStatus.getContainerId())) {
// Remember the follow up transition and save the final attempt state.
appAttempt.rememberTargetTransitionsAndStoreState(event,
- new ContainerFinishedFinalStateSavedTransition(),
- RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+ transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
return RMAppAttemptState.FINAL_SAVING;
}
// Normal container.Put it in completedcontainers list
appAttempt.justFinishedContainers.add(containerStatus);
- return RMAppAttemptState.RUNNING;
+ return this.currentState;
}
}
@@ -1451,7 +1467,7 @@ public class RMAppAttemptImpl implements
}
}
- private static class ContainerFinishedFinalStateSavedTransition extends
+ private static class AMContainerCrashedAtRunningTransition extends
BaseTransition {
@Override
public void
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Mon Jun 16 18:13:57 2014
@@ -33,5 +33,7 @@ public enum RMContainerEventType {
RELEASED,
// Source: ContainerAllocationExpirer
- EXPIRE
+ EXPIRE,
+
+ RECOVER
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Mon Jun 16 18:13:57 2014
@@ -35,12 +35,14 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -65,6 +67,9 @@ public class RMContainerImpl implements
RMContainerEventType.KILL)
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
+ .addTransition(RMContainerState.NEW,
+ EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
+ RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
// Transitions from RESERVED state
.addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
@@ -341,6 +346,38 @@ public class RMContainerImpl implements
}
}
+ private static final class ContainerRecoveredTransition
+ implements
+ MultipleArcTransition<RMContainerImpl, RMContainerEvent, RMContainerState> {
+ @Override
+ public RMContainerState transition(RMContainerImpl container,
+ RMContainerEvent event) {
+ NMContainerStatus report =
+ ((RMContainerRecoverEvent) event).getContainerReport();
+ if (report.getContainerState().equals(ContainerState.COMPLETE)) {
+ ContainerStatus status =
+ ContainerStatus.newInstance(report.getContainerId(),
+ report.getContainerState(), report.getDiagnostics(),
+ report.getContainerExitStatus());
+
+ new FinishedTransition().transition(container,
+ new RMContainerFinishedEvent(container.containerId, status,
+ RMContainerEventType.FINISHED));
+ return RMContainerState.COMPLETED;
+ } else if (report.getContainerState().equals(ContainerState.RUNNING)) {
+ // Tell the appAttempt
+ container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
+ container.getApplicationAttemptId(), container.getContainer()));
+ return RMContainerState.RUNNING;
+ } else {
+ // This can never happen.
+ LOG.warn("RMContainer received unexpected recover event with container"
+ + " state " + report.getContainerState() + " while recovering.");
+ return RMContainerState.RUNNING;
+ }
+ }
+ }
+
private static final class ContainerReservedTransition extends
BaseTransition {
@@ -398,7 +435,6 @@ public class RMContainerImpl implements
// Inform AppAttempt
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
-
container.rmContext.getRMApplicationHistoryWriter()
.containerFinished(container);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Mon Jun 16 18:13:57 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
@@ -460,13 +461,9 @@ public class RMNodeImpl implements RMNod
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
+ RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
+ List<NMContainerStatus> containers = null;
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeAddedSchedulerEvent(rmNode));
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodesListManagerEvent(
- NodesListManagerEventType.NODE_USABLE, rmNode));
-
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
// Old node rejoining
@@ -476,10 +473,17 @@ public class RMNodeImpl implements RMNod
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
+ containers = startEvent.getContainerRecoveryReports();
}
+
+ rmNode.context.getDispatcher().getEventHandler()
+ .handle(new NodeAddedSchedulerEvent(rmNode, containers));
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_USABLE, rmNode));
}
}
-
+
public static class ReconnectNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@@ -513,7 +517,7 @@ public class RMNodeImpl implements RMNod
}
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
rmNode.context.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
+ new RMNodeStartedEvent(newNode.getNodeID(), null));
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Mon Jun 16 18:13:57 2014
@@ -26,28 +26,35 @@ import java.util.concurrent.ConcurrentHa
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
+@SuppressWarnings("unchecked")
public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
- implements ResourceScheduler {
+ extends AbstractService implements ResourceScheduler {
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
// Nodes in the cluster, indexed by NodeId
- protected Map<NodeId, N> nodes =
- new ConcurrentHashMap<NodeId, N>();
+ protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
// Whole capacity of the cluster
protected Resource clusterResource = Resource.newInstance(0, 0);
@@ -57,11 +64,21 @@ public abstract class AbstractYarnSchedu
protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication<T>> applications;
+
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
+ /**
+ * Construct the service.
+ *
+ * @param name service name
+ */
+ public AbstractYarnScheduler(String name) {
+ super(name);
+ }
+
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
@@ -159,4 +176,90 @@ public abstract class AbstractYarnSchedu
throw new YarnException(getClass().getSimpleName()
+ " does not support moving apps between queues");
}
+
+ private void killOrphanContainerOnNode(RMNode node,
+ NMContainerStatus container) {
+ if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeCleanContainerEvent(node.getNodeID(),
+ container.getContainerId()));
+ }
+ }
+
+ public synchronized void recoverContainersOnNode(
+ List<NMContainerStatus> containerReports, RMNode nm) {
+ if (!rmContext.isWorkPreservingRecoveryEnabled()
+ || containerReports == null
+ || (containerReports != null && containerReports.isEmpty())) {
+ return;
+ }
+
+ for (NMContainerStatus container : containerReports) {
+ ApplicationId appId =
+ container.getContainerId().getApplicationAttemptId().getApplicationId();
+ RMApp rmApp = rmContext.getRMApps().get(appId);
+ if (rmApp == null) {
+ LOG.error("Skip recovering container " + container
+ + " for unknown application.");
+ killOrphanContainerOnNode(nm, container);
+ continue;
+ }
+
+ // Unmanaged AM recovery is addressed in YARN-1815
+ if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
+ LOG.info("Skip recovering container " + container + " for unmanaged AM."
+ + rmApp.getApplicationId());
+ killOrphanContainerOnNode(nm, container);
+ continue;
+ }
+
+ SchedulerApplication<T> schedulerApp = applications.get(appId);
+ if (schedulerApp == null) {
+ LOG.info("Skip recovering container " + container
+ + " for unknown SchedulerApplication. Application current state is "
+ + rmApp.getState());
+ killOrphanContainerOnNode(nm, container);
+ continue;
+ }
+
+ LOG.info("Recovering container " + container);
+ SchedulerApplicationAttempt schedulerAttempt =
+ schedulerApp.getCurrentAppAttempt();
+
+ // create container
+ RMContainer rmContainer = recoverAndCreateContainer(container, nm);
+
+ // recover RMContainer
+ rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
+ container));
+
+ // recover scheduler node
+ nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+
+ // recover queue: update headroom etc.
+ Queue queue = schedulerAttempt.getQueue();
+ queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
+
+ // recover scheduler attempt
+ schedulerAttempt.recoverContainer(rmContainer);
+ }
+ }
+
+ private RMContainer recoverAndCreateContainer(NMContainerStatus report,
+ RMNode node) {
+ Container container =
+ Container.newInstance(report.getContainerId(), node.getNodeID(),
+ node.getHttpAddress(), report.getAllocatedResource(),
+ report.getPriority(), null);
+ ApplicationAttemptId attemptId =
+ container.getId().getApplicationAttemptId();
+ RMContainer rmContainer =
+ new RMContainerImpl(container, attemptId, node.getNodeID(),
+ applications.get(attemptId.getApplicationId()).getUser(), rmContext);
+ return rmContainer;
+ }
+
+ public SchedulerNode getSchedulerNode(NodeId nodeId) {
+ return nodes.get(nodeId);
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Mon Jun 16 18:13:57 2014
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -409,4 +411,25 @@ public class AppSchedulingInfo {
// this.requests = appInfo.getRequests();
this.blacklist = appInfo.getBlackList();
}
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ // ContainerIdCounter on recovery will be addressed in YARN-2052
+ this.containerIdCounter.incrementAndGet();
+
+ QueueMetrics metrics = queue.getMetrics();
+ if (pending) {
+ // If there was any container to recover, the application was
+ // running from scheduler's POV.
+ pending = false;
+ metrics.runAppAttempt(applicationId, user);
+ }
+
+ // Container is completed. Skip recovering resources.
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+
+ metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
+ false);
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Mon Jun 16 18:13:57 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@Evolving
@LimitedPrivate("yarn")
@@ -60,4 +62,13 @@ public interface Queue {
boolean hasAccess(QueueACL acl, UserGroupInformation user);
public ActiveUsersManager getActiveUsersManager();
+
+ /**
+ * Recover the state of the queue for a given container.
+ * @param clusterResource the resource of the cluster
+ * @param schedulerAttempt the application for which the container was allocated
+ * @param rmContainer the container that was recovered.
+ */
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Mon Jun 16 18:13:57 2014
@@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.res
@LimitedPrivate("yarn")
@Evolving
public interface ResourceScheduler extends YarnScheduler, Recoverable {
+
+ /**
+ * Set RMContext for <code>ResourceScheduler</code>.
+ * This method should be called immediately after instantiating
+ * a scheduler once.
+ * @param rmContext created by ResourceManager
+ */
+ void setRMContext(RMContext rmContext);
+
/**
* Re-initialize the <code>ResourceScheduler</code>.
* @param conf configuration
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Mon Jun 16 18:13:57 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -76,6 +78,9 @@ public class SchedulerApplicationAttempt
protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
protected Resource currentConsumption = Resource.newInstance(0, 0);
+ private Resource amResource;
+ private boolean unmanagedAM = true;
+ private boolean amRunning = false;
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
@@ -106,6 +111,17 @@ public class SchedulerApplicationAttempt
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager);
this.queue = queue;
+
+ if (rmContext != null && rmContext.getRMApps() != null &&
+ rmContext.getRMApps()
+ .containsKey(applicationAttemptId.getApplicationId())) {
+ ApplicationSubmissionContext appSubmissionContext =
+ rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+ .getApplicationSubmissionContext();
+ if (appSubmissionContext != null) {
+ unmanagedAM = appSubmissionContext.getUnmanagedAM();
+ }
+ }
}
/**
@@ -168,6 +184,26 @@ public class SchedulerApplicationAttempt
return appSchedulingInfo.getQueueName();
}
+ public Resource getAMResource() {
+ return amResource;
+ }
+
+ public void setAMResource(Resource amResource) {
+ this.amResource = amResource;
+ }
+
+ public boolean isAmRunning() {
+ return amRunning;
+ }
+
+ public void setAmRunning(boolean bool) {
+ amRunning = bool;
+ }
+
+ public boolean getUnmanagedAM() {
+ return unmanagedAM;
+ }
+
public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
@@ -499,5 +535,24 @@ public class SchedulerApplicationAttempt
appSchedulingInfo.move(newQueue);
this.queue = newQueue;
- }
+ }
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ // recover app scheduling info
+ appSchedulingInfo.recoverContainer(rmContainer);
+
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ + " is recovering container " + rmContainer.getContainerId());
+ liveContainers.put(rmContainer.getContainerId(), rmContainer);
+ Resources.addTo(currentConsumption, rmContainer.getContainer()
+ .getResource());
+ // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
+ // is called.
+ // newlyAllocatedContainers.add(rmContainer);
+ // schedulingOpportunities
+ // lastScheduledContainer
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Mon Jun 16 18:13:57 2014
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -35,10 +34,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
-import com.google.common.base.Preconditions;
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -119,13 +118,10 @@ public abstract class SchedulerNode {
* The Scheduler has allocated containers on this node to the given
* application.
*
- * @param applicationId
- * application
* @param rmContainer
* allocated container
*/
- public synchronized void allocateContainer(ApplicationId applicationId,
- RMContainer rmContainer) {
+ public synchronized void allocateContainer(RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
@@ -166,8 +162,8 @@ public abstract class SchedulerNode {
return this.totalResourceCapability;
}
- private synchronized boolean isValidContainer(Container c) {
- if (launchedContainers.containsKey(c.getId())) {
+ public synchronized boolean isValidContainer(ContainerId containerId) {
+ if (launchedContainers.containsKey(containerId)) {
return true;
}
return false;
@@ -185,7 +181,7 @@ public abstract class SchedulerNode {
* container to be released
*/
public synchronized void releaseContainer(Container container) {
- if (!isValidContainer(container)) {
+ if (!isValidContainer(container.getId())) {
LOG.error("Invalid container released " + container);
return;
}
@@ -274,4 +270,12 @@ public abstract class SchedulerNode {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
}
+
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ allocateContainer(rmContainer);
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Mon Jun 16 18:13:57 2014
@@ -28,7 +28,6 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
@@ -235,15 +234,6 @@ extends org.apache.hadoop.yarn.server.re
public ActiveUsersManager getActiveUsersManager();
/**
- * Recover the state of the queue
- * @param clusterResource the resource of the cluster
- * @param application the application for which the container was allocated
- * @param container the container that was recovered.
- */
- public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
- Container container);
-
- /**
* Adds all applications in the queue and its subqueues to the given collection.
* @param apps the collection to add the applications to
*/
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -102,6 +103,8 @@ public class CapacityScheduler extends
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
private CSQueue root;
+ // timeout to join when we stop this service
+ protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
@Override
@@ -179,8 +182,6 @@ public class CapacityScheduler extends
private int numNodeManagers = 0;
- private boolean initialized = false;
-
private ResourceCalculator calculator;
private boolean usePortForNodeName;
@@ -196,7 +197,9 @@ public class CapacityScheduler extends
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
- public CapacityScheduler() {}
+ public CapacityScheduler() {
+ super(CapacityScheduler.class.getName());
+ }
@Override
public QueueMetrics getRootQueueMetrics() {
@@ -238,56 +241,91 @@ public class CapacityScheduler extends
}
@Override
- public RMContext getRMContext() {
+ public synchronized RMContext getRMContext() {
return this.rmContext;
}
-
+
@Override
- public synchronized void
- reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ private synchronized void initScheduler(Configuration configuration) throws
+ IOException {
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ this.minimumAllocation = this.conf.getMinimumAllocation();
+ this.maximumAllocation = this.conf.getMaximumAllocation();
+ this.calculator = this.conf.getResourceCalculator();
+ this.usePortForNodeName = this.conf.getUsePortForNodeName();
+ this.applications =
+ new ConcurrentHashMap<ApplicationId,
+ SchedulerApplication<FiCaSchedulerApp>>();
+
+ initializeQueues(this.conf);
+
+ scheduleAsynchronously = this.conf.getScheduleAynschronously();
+ asyncScheduleInterval =
+ this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+ DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+ if (scheduleAsynchronously) {
+ asyncSchedulerThread = new AsyncScheduleThread(this);
+ }
+
+ LOG.info("Initialized CapacityScheduler with " +
+ "calculator=" + getResourceCalculator().getClass() + ", " +
+ "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
+ "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+ "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+ "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+ }
+
+ private synchronized void startSchedulerThreads() {
+ if (scheduleAsynchronously) {
+ Preconditions.checkNotNull(asyncSchedulerThread,
+ "asyncSchedulerThread is null");
+ asyncSchedulerThread.start();
+ }
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
- if (!initialized) {
- this.rmContext = rmContext;
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
- this.minimumAllocation = this.conf.getMinimumAllocation();
- this.maximumAllocation = this.conf.getMaximumAllocation();
- this.calculator = this.conf.getResourceCalculator();
- this.usePortForNodeName = this.conf.getUsePortForNodeName();
- this.applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+ initScheduler(configuration);
+ super.serviceInit(conf);
+ }
- initializeQueues(this.conf);
-
- scheduleAsynchronously = this.conf.getScheduleAynschronously();
- asyncScheduleInterval =
- this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
- DEFAULT_ASYNC_SCHEDULER_INTERVAL);
- if (scheduleAsynchronously) {
- asyncSchedulerThread = new AsyncScheduleThread(this);
- asyncSchedulerThread.start();
- }
-
- initialized = true;
- LOG.info("Initialized CapacityScheduler with " +
- "calculator=" + getResourceCalculator().getClass() + ", " +
- "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
- "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
- "asynchronousScheduling=" + scheduleAsynchronously + ", " +
- "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
-
- } else {
- CapacitySchedulerConfiguration oldConf = this.conf;
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
- try {
- LOG.info("Re-initializing queues...");
- reinitializeQueues(this.conf);
- } catch (Throwable t) {
- this.conf = oldConf;
- throw new IOException("Failed to re-init queues", t);
+ @Override
+ public void serviceStart() throws Exception {
+ startSchedulerThreads();
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ synchronized (this) {
+ if (scheduleAsynchronously && asyncSchedulerThread != null) {
+ asyncSchedulerThread.interrupt();
+ asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
}
}
+ super.serviceStop();
+ }
+
+ @Override
+ public synchronized void
+ reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ Configuration configuration = new Configuration(conf);
+ CapacitySchedulerConfiguration oldConf = this.conf;
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ try {
+ LOG.info("Re-initializing queues...");
+ reinitializeQueues(this.conf);
+ } catch (Throwable t) {
+ this.conf = oldConf;
+ throw new IOException("Failed to re-init queues", t);
+ }
}
long getAsyncScheduleInterval() {
@@ -834,6 +872,8 @@ public class CapacityScheduler extends
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
+ recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+ nodeAddedEvent.getAddedRMNode());
}
break;
case NODE_REMOVED:
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon Jun 16 18:13:57 2014
@@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.annotations.VisibleForTesting;
+
@Private
@Unstable
public class LeafQueue implements CSQueue {
@@ -564,7 +566,8 @@ public class LeafQueue implements CSQueu
"numContainers=" + getNumContainers();
}
- private synchronized User getUser(String userName) {
+ @VisibleForTesting
+ public synchronized User getUser(String userName) {
User user = users.get(userName);
if (user == null) {
user = new User();
@@ -1346,8 +1349,7 @@ public class LeafQueue implements CSQueu
}
// Inform the node
- node.allocateContainer(application.getApplicationId(),
- allocatedContainer);
+ node.allocateContainer(allocatedContainer);
LOG.info("assignedContainer" +
" application attempt=" + application.getApplicationAttemptId() +
@@ -1446,7 +1448,7 @@ public class LeafQueue implements CSQueu
}
synchronized void allocateResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource) {
+ SchedulerApplicationAttempt application, Resource resource) {
// Update queue metrics
Resources.addTo(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
@@ -1530,7 +1532,8 @@ public class LeafQueue implements CSQueu
return metrics;
}
- static class User {
+ @VisibleForTesting
+ public static class User {
Resource consumed = Resources.createResource(0, 0);
int pendingApplications = 0;
int activeApplications = 0;
@@ -1580,13 +1583,16 @@ public class LeafQueue implements CSQueu
@Override
public void recoverContainer(Resource clusterResource,
- FiCaSchedulerApp application, Container container) {
+ SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource, application, container.getResource());
+ allocateResource(clusterResource, attempt, rmContainer.getContainer()
+ .getResource());
}
- getParent().recoverContainer(clusterResource, application, container);
-
+ getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
/**
@@ -1613,5 +1619,4 @@ public class LeafQueue implements CSQueu
apps.add(app.getApplicationAttemptId());
}
}
-
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Mon Jun 16 18:13:57 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -770,13 +771,16 @@ public class ParentQueue implements CSQu
@Override
public void recoverContainer(Resource clusterResource,
- FiCaSchedulerApp application, Container container) {
+ SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource, container.getResource());
+ allocateResource(clusterResource,rmContainer.getContainer().getResource());
}
if (parent != null) {
- parent.recoverContainer(clusterResource, application, container);
+ parent.recoverContainer(clusterResource, attempt, rmContainer);
}
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Mon Jun 16 18:13:57 2014
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java Mon Jun 16 18:13:57 2014
@@ -18,19 +18,34 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class NodeAddedSchedulerEvent extends SchedulerEvent {
private final RMNode rmNode;
+ private final List<NMContainerStatus> containerReports;
public NodeAddedSchedulerEvent(RMNode rmNode) {
super(SchedulerEventType.NODE_ADDED);
this.rmNode = rmNode;
+ this.containerReports = null;
+ }
+
+ public NodeAddedSchedulerEvent(RMNode rmNode,
+ List<NMContainerStatus> containerReports) {
+ super(SchedulerEventType.NODE_ADDED);
+ this.rmNode = rmNode;
+ this.containerReports = containerReports;
}
public RMNode getAddedRMNode() {
return rmNode;
}
+ public List<NMContainerStatus> getContainerReports() {
+ return containerReports;
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Mon Jun 16 18:13:57 2014
@@ -53,6 +53,10 @@ public class AllocationConfiguration {
private final int userMaxAppsDefault;
private final int queueMaxAppsDefault;
+ // Maximum resource share for each leaf queue that can be used to run AMs
+ final Map<String, Float> queueMaxAMShares;
+ private final float queueMaxAMShareDefault;
+
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
@@ -84,8 +88,9 @@ public class AllocationConfiguration {
public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
- Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
- int queueMaxAppsDefault,
+ Map<String, ResourceWeights> queueWeights,
+ Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
+ int queueMaxAppsDefault, float queueMaxAMShareDefault,
Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
@@ -97,9 +102,11 @@ public class AllocationConfiguration {
this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps;
this.userMaxApps = userMaxApps;
+ this.queueMaxAMShares = queueMaxAMShares;
this.queueWeights = queueWeights;
this.userMaxAppsDefault = userMaxAppsDefault;
this.queueMaxAppsDefault = queueMaxAppsDefault;
+ this.queueMaxAMShareDefault = queueMaxAMShareDefault;
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
this.schedulingPolicies = schedulingPolicies;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
@@ -116,8 +123,10 @@ public class AllocationConfiguration {
queueWeights = new HashMap<String, ResourceWeights>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
+ queueMaxAMShares = new HashMap<String, Float>();
userMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAppsDefault = Integer.MAX_VALUE;
+ queueMaxAMShareDefault = 1.0f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
@@ -184,6 +193,11 @@ public class AllocationConfiguration {
return (maxApps == null) ? queueMaxAppsDefault : maxApps;
}
+ public float getQueueMaxAMShare(String queue) {
+ Float maxAMShare = queueMaxAMShares.get(queue);
+ return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
+ }
+
/**
* Get the minimum resource allocation for the given queue.
* @return the cap set on this queue, or 0 if not set.
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Mon Jun 16 18:13:57 2014
@@ -68,7 +68,9 @@ public class AllocationFileLoaderService
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
-
+
+ public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
private final Clock clock;
private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -96,58 +98,69 @@ public class AllocationFileLoaderService
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
this.allocFile = getAllocationFile(conf);
- super.init(conf);
- }
-
- @Override
- public void start() {
- if (allocFile == null) {
- return;
- }
- reloadThread = new Thread() {
- public void run() {
- while (running) {
- long time = clock.getTime();
- long lastModified = allocFile.lastModified();
- if (lastModified > lastSuccessfulReload &&
- time > lastModified + ALLOC_RELOAD_WAIT_MS) {
- try {
- reloadAllocations();
- } catch (Exception ex) {
+ if (allocFile != null) {
+ reloadThread = new Thread() {
+ @Override
+ public void run() {
+ while (running) {
+ long time = clock.getTime();
+ long lastModified = allocFile.lastModified();
+ if (lastModified > lastSuccessfulReload &&
+ time > lastModified + ALLOC_RELOAD_WAIT_MS) {
+ try {
+ reloadAllocations();
+ } catch (Exception ex) {
+ if (!lastReloadAttemptFailed) {
+ LOG.error("Failed to reload fair scheduler config file - " +
+ "will use existing allocations.", ex);
+ }
+ lastReloadAttemptFailed = true;
+ }
+ } else if (lastModified == 0l) {
if (!lastReloadAttemptFailed) {
- LOG.error("Failed to reload fair scheduler config file - " +
- "will use existing allocations.", ex);
+ LOG.warn("Failed to reload fair scheduler config file because" +
+ " last modified returned 0. File exists: "
+ + allocFile.exists());
}
lastReloadAttemptFailed = true;
}
- } else if (lastModified == 0l) {
- if (!lastReloadAttemptFailed) {
- LOG.warn("Failed to reload fair scheduler config file because" +
- " last modified returned 0. File exists: " + allocFile.exists());
+ try {
+ Thread.sleep(reloadIntervalMs);
+ } catch (InterruptedException ex) {
+ LOG.info(
+ "Interrupted while waiting to reload alloc configuration");
}
- lastReloadAttemptFailed = true;
- }
- try {
- Thread.sleep(reloadIntervalMs);
- } catch (InterruptedException ex) {
- LOG.info("Interrupted while waiting to reload alloc configuration");
}
}
- }
- };
- reloadThread.setName("AllocationFileReloader");
- reloadThread.setDaemon(true);
- reloadThread.start();
- super.start();
+ };
+ reloadThread.setName("AllocationFileReloader");
+ reloadThread.setDaemon(true);
+ }
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ public void serviceStart() throws Exception {
+ if (reloadThread != null) {
+ reloadThread.start();
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
running = false;
- reloadThread.interrupt();
- super.stop();
+ if (reloadThread != null) {
+ reloadThread.interrupt();
+ try {
+ reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ LOG.warn("reloadThread fails to join.");
+ }
+ }
+ super.serviceStop();
}
/**
@@ -200,6 +213,7 @@ public class AllocationFileLoaderService
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+ Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@@ -207,6 +221,7 @@ public class AllocationFileLoaderService
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
+ float queueMaxAMShareDefault = 1.0f;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@@ -273,6 +288,11 @@ public class AllocationFileLoaderService
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;
+ } else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.min(val, 1.0f);
+ queueMaxAMShareDefault = val;
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
@@ -297,8 +317,8 @@ public class AllocationFileLoaderService
parent = null;
}
loadQueue(parent, element, minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, queuePolicies,
- minSharePreemptionTimeouts, queueAcls,
+ queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+ queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
}
@@ -313,8 +333,8 @@ public class AllocationFileLoaderService
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
- queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
+ queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
+ queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
newPlacementPolicy, configuredQueues);
@@ -329,7 +349,8 @@ public class AllocationFileLoaderService
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
- Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
+ Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
+ Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
@@ -361,6 +382,11 @@ public class AllocationFileLoaderService
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
+ } else if ("maxAMShare".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.min(val, 1.0f);
+ queueMaxAMShares.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
@@ -383,8 +409,9 @@ public class AllocationFileLoaderService
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, queuePolicies,
- minSharePreemptionTimeouts, queueAcls, configuredQueues);
+ queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+ queuePolicies, minSharePreemptionTimeouts, queueAcls,
+ configuredQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Mon Jun 16 18:13:57 2014
@@ -264,8 +264,14 @@ public class AppSchedulable extends Sche
}
// Inform the node
- node.allocateContainer(app.getApplicationId(),
- allocatedContainer);
+ node.allocateContainer(allocatedContainer);
+
+ // If this container is used to run AM, update the leaf queue's AM usage
+ if (app.getLiveContainers().size() == 1 &&
+ !app.getUnmanagedAM()) {
+ queue.addAMResourceUsage(container.getResource());
+ app.setAmRunning(true);
+ }
return container.getResource();
} else {
@@ -297,6 +303,14 @@ public class AppSchedulable extends Sche
app.addSchedulingOpportunity(priority);
+ // Check the AM resource usage for the leaf queue
+ if (app.getLiveContainers().size() == 0
+ && !app.getUnmanagedAM()) {
+ if (!queue.canRunAppAM(app.getAMResource())) {
+ return Resources.none();
+ }
+ }
+
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Mon Jun 16 18:13:57 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@@ -55,6 +56,9 @@ public class FSLeafQueue extends FSQueue
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
+ // Track the AM resource usage for this queue
+ private Resource amResourceUsage;
+
private final ActiveUsersManager activeUsersManager;
public FSLeafQueue(String name, FairScheduler scheduler,
@@ -63,6 +67,7 @@ public class FSLeafQueue extends FSQueue
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
activeUsersManager = new ActiveUsersManager(getMetrics());
+ amResourceUsage = Resource.newInstance(0, 0);
}
public void addApp(FSSchedulerApp app, boolean runnable) {
@@ -86,6 +91,10 @@ public class FSLeafQueue extends FSQueue
*/
public boolean removeApp(FSSchedulerApp app) {
if (runnableAppScheds.remove(app.getAppSchedulable())) {
+ // Update AM resource usage
+ if (app.isAmRunning() && app.getAMResource() != null) {
+ Resources.subtractFrom(amResourceUsage, app.getAMResource());
+ }
return true;
} else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
return false;
@@ -145,6 +154,10 @@ public class FSLeafQueue extends FSQueue
return usage;
}
+ public Resource getAmResourceUsage() {
+ return amResourceUsage;
+ }
+
@Override
public void updateDemand() {
// Compute demand by iterating through apps in the queue
@@ -284,4 +297,32 @@ public class FSLeafQueue extends FSQueue
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
+
+ /**
+ * Check whether this queue can run this application master under the
+ * maxAMShare limit
+ *
+ * @param amResource
+ * @return true if this queue can run
+ */
+ public boolean canRunAppAM(Resource amResource) {
+ float maxAMShare =
+ scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
+ Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
+ Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
+ return !policy
+ .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
+ }
+
+ public void addAMResourceUsage(Resource amResource) {
+ if (amResource != null) {
+ Resources.addTo(amResourceUsage, amResource);
+ }
+ }
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+ }
}