You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/01/02 21:21:05 UTC
svn commit: r1554898 [1/3] - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/
hadoop-yarn/hadoop-yarn-server...
Author: vinodkv
Date: Thu Jan 2 20:21:03 2014
New Revision: 1554898
URL: http://svn.apache.org/r1554898
Log:
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize app-attempts separately from apps. Contributed by Jian He.
svn merge --ignore-ancestry -c 1554896 ../../trunk/
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
- copied unchanged from r1554896, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
- copied unchanged from r1554896, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java
- copied unchanged from r1554896, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java
Removed:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptRejectedEvent.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Jan 2 20:21:03 2014
@@ -176,6 +176,9 @@ Release 2.4.0 - UNRELEASED
YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port
information once an AM crashes. (Jian He via vinodkv)
+ YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
+ app-attempts separately from apps. (Jian He via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Thu Jan 2 20:21:03 2014
@@ -24,9 +24,11 @@ public enum RMAppEventType {
RECOVER,
KILL,
- // Source: RMAppAttempt
+ // Source: Scheduler
APP_REJECTED,
APP_ACCEPTED,
+
+ // Source: RMAppAttempt
ATTEMPT_REGISTERED,
ATTEMPT_UNREGISTERED,
ATTEMPT_FINISHED, // Will send the final state
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Thu Jan 2 20:21:03 2014
@@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -136,7 +138,7 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
- RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
+ RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
@@ -151,7 +153,7 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
- RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition())
+ RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.KILL,
new FinalSavingTransition(
@@ -169,9 +171,12 @@ public class RMAppImpl implements RMApp,
new FinalSavingTransition(
new AppRejectedTransition(), RMAppState.FAILED))
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
- RMAppEventType.APP_ACCEPTED)
- .addTransition(RMAppState.SUBMITTED, RMAppState.KILLING,
- RMAppEventType.KILL,new KillAttemptTransition())
+ RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.KILL,
+ new FinalSavingTransition(
+ new AppKilledTransition(), RMAppState.KILLED))
+
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -179,11 +184,22 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED)
.addTransition(RMAppState.ACCEPTED,
- EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
+ EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
+ // ACCEPTED state is possible to receive ATTEMPT_FAILED event because
+ // RMAppRecoveredTransition is returning ACCEPTED state directly and
+ // waiting for the previous AM to exit.
RMAppEventType.ATTEMPT_FAILED,
- new AttemptFailedTransition(RMAppState.SUBMITTED))
- .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
- RMAppEventType.KILL,new KillAttemptTransition())
+ new AttemptFailedTransition(RMAppState.ACCEPTED))
+ .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.KILL,
+ new FinalSavingTransition(
+ new AppKilledTransition(), RMAppState.KILLED))
+ // ACCECPTED state can once again receive APP_ACCEPTED event, because on
+ // recovery the app returns ACCEPTED state and the app once again go
+ // through the scheduler and triggers one more APP_ACCEPTED event at
+ // ACCEPTED state.
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.APP_ACCEPTED)
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -197,9 +213,9 @@ public class RMAppImpl implements RMApp,
// UnManagedAM directly jumps to finished
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
.addTransition(RMAppState.RUNNING,
- EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
+ EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
- new AttemptFailedTransition(RMAppState.SUBMITTED))
+ new AttemptFailedTransition(RMAppState.ACCEPTED))
.addTransition(RMAppState.RUNNING, RMAppState.KILLING,
RMAppEventType.KILL, new KillAttemptTransition())
@@ -641,7 +657,7 @@ public class RMAppImpl implements RMApp,
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
- submissionContext, conf, user);
+ submissionContext, conf);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
if(startAttempt) {
@@ -695,29 +711,46 @@ public class RMAppImpl implements RMApp,
return app.recoveredFinalState;
}
+ // Notify scheduler about the app on recovery
+ new AddApplicationToSchedulerTransition().transition(app, event);
+
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
- app.createNewAttempt(true);
return RMAppState.SUBMITTED;
}
- return RMAppState.RUNNING;
+ // YARN-1507 is saving the application state after the application is
+ // accepted. So after YARN-1507, an app is saved meaning it is accepted.
+ // Thus we return ACCECPTED state on recovery.
+ return RMAppState.ACCEPTED;
}
}
- private static final class StartAppAttemptTransition extends RMAppTransition {
+ private static final class AddApplicationToSchedulerTransition extends
+ RMAppTransition {
+ @SuppressWarnings("unchecked")
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
- if (storeEvent.getStoredException() != null) {
+ if (event instanceof RMAppNewSavedEvent) {
+ RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
// For HA this exception needs to be handled by giving up
// master status if we got fenced
- LOG.error(
- "Failed to store application: " + storeEvent.getApplicationId(),
- storeEvent.getStoredException());
- ExitUtil.terminate(1, storeEvent.getStoredException());
+ if (((RMAppNewSavedEvent) event).getStoredException() != null) {
+ LOG.error(
+ "Failed to store application: " + storeEvent.getApplicationId(),
+ storeEvent.getStoredException());
+ ExitUtil.terminate(1, storeEvent.getStoredException());
+ }
}
+ app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
+ app.submissionContext.getQueue(), app.user));
+ }
+ }
+
+ private static final class StartAppAttemptTransition extends RMAppTransition {
+ @Override
+ public void transition(RMAppImpl app, RMAppEvent event) {
app.createNewAttempt(true);
};
}
@@ -965,6 +998,8 @@ public class RMAppImpl implements RMApp,
if (app.finishTime == 0 ) {
app.finishTime = System.currentTimeMillis();
}
+ app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, app
+ .getState()));
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
@@ -993,7 +1028,6 @@ public class RMAppImpl implements RMApp,
return RMAppState.FINAL_SAVING;
}
}
-
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Thu Jan 2 20:21:03 2014
@@ -45,8 +45,7 @@ public enum RMAppAttemptEventType {
ATTEMPT_UPDATE_SAVED,
// Source: Scheduler
- APP_REJECTED,
- APP_ACCEPTED,
+ ATTEMPT_ADDED,
// Source: RMAttemptImpl.recover
RECOVER
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Thu Jan 2 20:21:03 2014
@@ -75,13 +75,11 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
@@ -150,7 +148,6 @@ public class RMAppAttemptImpl implements
private final StringBuilder diagnostics = new StringBuilder();
private Configuration conf;
- private String user;
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
@@ -186,14 +183,10 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SUBMITTED state
- .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
- RMAppAttemptEventType.APP_REJECTED,
- new FinalSavingTransition(new AppRejectedTransition(),
- RMAppAttemptState.FAILED))
.addTransition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.SCHEDULED),
- RMAppAttemptEventType.APP_ACCEPTED,
+ RMAppAttemptEventType.ATTEMPT_ADDED,
new ScheduleTransition())
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.KILL,
@@ -380,8 +373,7 @@ public class RMAppAttemptImpl implements
.addTransition(
RMAppAttemptState.KILLED,
RMAppAttemptState.KILLED,
- EnumSet.of(RMAppAttemptEventType.APP_ACCEPTED,
- RMAppAttemptEventType.APP_REJECTED,
+ EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.LAUNCH_FAILED,
@@ -398,7 +390,7 @@ public class RMAppAttemptImpl implements
RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext,
- Configuration conf, String user) {
+ Configuration conf) {
this.conf = conf;
this.applicationAttemptId = appAttemptId;
this.rmContext = rmContext;
@@ -414,7 +406,6 @@ public class RMAppAttemptImpl implements
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
this.stateMachine = stateMachineFactory.make(this);
- this.user = user;
}
@Override
@@ -750,35 +741,8 @@ public class RMAppAttemptImpl implements
appAttempt.rmContext.getAMRMTokenSecretManager());
// Add the applicationAttempt to the scheduler
- appAttempt.eventHandler.handle(
- new AppAttemptAddedSchedulerEvent(appAttempt.applicationAttemptId,
- appAttempt.submissionContext.getQueue(), appAttempt.user));
- }
- }
-
- private static final class AppRejectedTransition extends BaseTransition {
- @Override
- public void transition(RMAppAttemptImpl appAttempt,
- RMAppAttemptEvent event) {
-
- RMAppAttemptRejectedEvent rejectedEvent = (RMAppAttemptRejectedEvent) event;
-
- // Tell the AMS. Unregister from the ApplicationMasterService
- appAttempt.masterService
- .unregisterAttempt(appAttempt.applicationAttemptId);
-
- // Save the diagnostic message
- String message = rejectedEvent.getMessage();
- appAttempt.diagnostics.append(message);
-
- // Send the rejection event to app
- appAttempt.eventHandler.handle(
- new RMAppRejectedEvent(
- rejectedEvent.getApplicationAttemptId().getApplicationId(),
- message)
- );
-
- appAttempt.removeCredentials(appAttempt);
+ appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
+ appAttempt.applicationAttemptId));
}
}
@@ -794,11 +758,6 @@ public class RMAppAttemptImpl implements
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
if (!appAttempt.submissionContext.getUnmanagedAM()) {
- // Send the acceptance to the app
- appAttempt.eventHandler.handle(new RMAppEvent(event
- .getApplicationAttemptId().getApplicationId(),
- RMAppEventType.APP_ACCEPTED));
-
// Request a container for the AM.
ResourceRequest request =
BuilderUtils.newResourceRequest(
@@ -918,11 +877,6 @@ public class RMAppAttemptImpl implements
FinalApplicationStatus finalStatus = null;
switch (event.getType()) {
- case APP_REJECTED:
- RMAppAttemptRejectedEvent rejectedEvent =
- (RMAppAttemptRejectedEvent) event;
- diags = rejectedEvent.getMessage();
- break;
case LAUNCH_FAILED:
RMAppAttemptLaunchFailedEvent launchFaileEvent =
(RMAppAttemptLaunchFailedEvent) event;
@@ -1091,16 +1045,6 @@ public class RMAppAttemptImpl implements
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
appAttempt.checkAttemptStoreError(event);
- // Send the acceptance to the app
- // Ideally this should have been done when the scheduler accepted the app.
- // But its here because until the attempt is saved the client should not
- // launch the unmanaged AM. Client waits for the app status to be accepted
- // before doing so. So we have to delay the accepted state until we have
- // completed storing the attempt
- appAttempt.eventHandler.handle(new RMAppEvent(event
- .getApplicationAttemptId().getApplicationId(),
- RMAppEventType.APP_ACCEPTED));
-
super.transition(appAttempt, event);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java Thu Jan 2 20:21:03 2014
@@ -56,7 +56,7 @@ public class ActiveUsersManager {
* @param user application user
* @param applicationId activated application
*/
- @Lock({Queue.class, SchedulerApplication.class})
+ @Lock({Queue.class, SchedulerApplicationAttempt.class})
synchronized public void activateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@@ -79,7 +79,7 @@ public class ActiveUsersManager {
* @param user application user
* @param applicationId deactivated application
*/
- @Lock({Queue.class, SchedulerApplication.class})
+ @Lock({Queue.class, SchedulerApplicationAttempt.class})
synchronized public void deactivateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@@ -102,7 +102,7 @@ public class ActiveUsersManager {
* resource requests.
* @return number of active users
*/
- @Lock({Queue.class, SchedulerApplication.class})
+ @Lock({Queue.class, SchedulerApplicationAttempt.class})
synchronized public int getNumActiveUsers() {
return activeUsers;
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java Thu Jan 2 20:21:03 2014
@@ -36,7 +36,7 @@ public class SchedulerAppReport {
private final Collection<RMContainer> reserved;
private final boolean pending;
- public SchedulerAppReport(SchedulerApplication app) {
+ public SchedulerAppReport(SchedulerApplicationAttempt app) {
this.live = app.getLiveContainers();
this.reserved = app.getReservedContainers();
this.pending = app.isPending();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java Thu Jan 2 20:21:03 2014
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
public class SchedulerAppUtils {
- public static boolean isBlacklisted(SchedulerApplication application,
+ public static boolean isBlacklisted(SchedulerApplicationAttempt application,
SchedulerNode node, Log LOG) {
if (application.isBlacklisted(node.getNodeName())) {
if (LOG.isDebugEnabled()) {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Thu Jan 2 20:21:03 2014
@@ -17,393 +17,26 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-/**
- * Represents an application attempt from the viewpoint of the scheduler.
- * Each running app attempt in the RM corresponds to one instance
- * of this class.
- */
@Private
@Unstable
-public abstract class SchedulerApplication {
-
- private static final Log LOG = LogFactory.getLog(SchedulerApplication.class);
-
- protected final AppSchedulingInfo appSchedulingInfo;
-
- protected final Map<ContainerId, RMContainer> liveContainers =
- new HashMap<ContainerId, RMContainer>();
- protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
- new HashMap<Priority, Map<NodeId, RMContainer>>();
-
- private final Multiset<Priority> reReservations = HashMultiset.create();
-
- protected final Resource currentReservation = Resource.newInstance(0, 0);
- private Resource resourceLimit = Resource.newInstance(0, 0);
- protected final Resource currentConsumption = Resource.newInstance(0, 0);
+public class SchedulerApplication {
- protected List<RMContainer> newlyAllocatedContainers =
- new ArrayList<RMContainer>();
+ private final Queue queue;
+ private final String user;
- /**
- * Count how many times the application has been given an opportunity
- * to schedule a task at each priority. Each time the scheduler
- * asks the application for a task at this priority, it is incremented,
- * and each time the application successfully schedules a task, it
- * is reset to 0.
- */
- Multiset<Priority> schedulingOpportunities = HashMultiset.create();
-
- // Time of the last container scheduled at the current allowed level
- protected Map<Priority, Long> lastScheduledContainer =
- new HashMap<Priority, Long>();
-
- protected final Queue queue;
- protected boolean isStopped = false;
-
- protected final RMContext rmContext;
-
- public SchedulerApplication(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
- RMContext rmContext) {
- this.rmContext = rmContext;
- this.appSchedulingInfo =
- new AppSchedulingInfo(applicationAttemptId, user, queue,
- activeUsersManager);
+ public SchedulerApplication(Queue queue, String user) {
this.queue = queue;
- }
-
- /**
- * Get the live containers of the application.
- * @return live containers of the application
- */
- public synchronized Collection<RMContainer> getLiveContainers() {
- return new ArrayList<RMContainer>(liveContainers.values());
- }
-
- /**
- * Is this application pending?
- * @return true if it is else false.
- */
- public boolean isPending() {
- return appSchedulingInfo.isPending();
- }
-
- /**
- * Get {@link ApplicationAttemptId} of the application master.
- * @return <code>ApplicationAttemptId</code> of the application master
- */
- public ApplicationAttemptId getApplicationAttemptId() {
- return appSchedulingInfo.getApplicationAttemptId();
- }
-
- public ApplicationId getApplicationId() {
- return appSchedulingInfo.getApplicationId();
- }
-
- public String getUser() {
- return appSchedulingInfo.getUser();
- }
-
- public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
- return appSchedulingInfo.getResourceRequests(priority);
- }
-
- public int getNewContainerId() {
- return appSchedulingInfo.getNewContainerId();
- }
-
- public Collection<Priority> getPriorities() {
- return appSchedulingInfo.getPriorities();
- }
-
- public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
- return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
- }
-
- public synchronized int getTotalRequiredResources(Priority priority) {
- return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
- }
-
- public Resource getResource(Priority priority) {
- return appSchedulingInfo.getResource(priority);
- }
-
- public String getQueueName() {
- return appSchedulingInfo.getQueueName();
- }
-
- public synchronized RMContainer getRMContainer(ContainerId id) {
- return liveContainers.get(id);
- }
-
- protected synchronized void resetReReservations(Priority priority) {
- reReservations.setCount(priority, 0);
- }
-
- protected synchronized void addReReservation(Priority priority) {
- reReservations.add(priority);
- }
-
- public synchronized int getReReservations(Priority priority) {
- return reReservations.count(priority);
+ this.user = user;
}
- /**
- * Get total current reservations.
- * Used only by unit tests
- * @return total current reservations
- */
- @Stable
- @Private
- public synchronized Resource getCurrentReservation() {
- return currentReservation;
- }
-
public Queue getQueue() {
return queue;
}
-
- public synchronized void updateResourceRequests(
- List<ResourceRequest> requests) {
- if (!isStopped) {
- appSchedulingInfo.updateResourceRequests(requests);
- }
- }
-
- public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
- // Cleanup all scheduling information
- isStopped = true;
- appSchedulingInfo.stop(rmAppAttemptFinalState);
- }
-
- public synchronized boolean isStopped() {
- return isStopped;
- }
-
- /**
- * Get the list of reserved containers
- * @return All of the reserved containers.
- */
- public synchronized List<RMContainer> getReservedContainers() {
- List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
- for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
- this.reservedContainers.entrySet()) {
- reservedContainers.addAll(e.getValue().values());
- }
- return reservedContainers;
- }
-
- public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
- RMContainer rmContainer, Container container) {
- // Create RMContainer if necessary
- if (rmContainer == null) {
- rmContainer =
- new RMContainerImpl(container, getApplicationAttemptId(),
- node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
- rmContext.getContainerAllocationExpirer());
-
- Resources.addTo(currentReservation, container.getResource());
-
- // Reset the re-reservation count
- resetReReservations(priority);
- } else {
- // Note down the re-reservation
- addReReservation(priority);
- }
- rmContainer.handle(new RMContainerReservedEvent(container.getId(),
- container.getResource(), node.getNodeID(), priority));
-
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- if (reservedContainers == null) {
- reservedContainers = new HashMap<NodeId, RMContainer>();
- this.reservedContainers.put(priority, reservedContainers);
- }
- reservedContainers.put(node.getNodeID(), rmContainer);
-
- LOG.info("Application " + getApplicationId()
- + " reserved container " + rmContainer
- + " on node " + node + ", currently has " + reservedContainers.size()
- + " at priority " + priority
- + "; currentReservation " + currentReservation.getMemory());
-
- return rmContainer;
- }
-
- /**
- * Has the application reserved the given <code>node</code> at the
- * given <code>priority</code>?
- * @param node node to be checked
- * @param priority priority of reserved container
- * @return true is reserved, false if not
- */
- public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- if (reservedContainers != null) {
- return reservedContainers.containsKey(node.getNodeID());
- }
- return false;
- }
-
- public synchronized void setHeadroom(Resource globalLimit) {
- this.resourceLimit = globalLimit;
- }
-
- /**
- * Get available headroom in terms of resources for the application's user.
- * @return available resource headroom
- */
- public synchronized Resource getHeadroom() {
- // Corner case to deal with applications being slightly over-limit
- if (resourceLimit.getMemory() < 0) {
- resourceLimit.setMemory(0);
- }
-
- return resourceLimit;
- }
-
- public synchronized int getNumReservedContainers(Priority priority) {
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- return (reservedContainers == null) ? 0 : reservedContainers.size();
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void containerLaunchedOnNode(ContainerId containerId,
- NodeId nodeId) {
- // Inform the container
- RMContainer rmContainer = getRMContainer(containerId);
- if (rmContainer == null) {
- // Some unknown container sneaked into the system. Kill it.
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
- return;
- }
-
- rmContainer.handle(new RMContainerEvent(containerId,
- RMContainerEventType.LAUNCHED));
- }
-
- public synchronized void showRequests() {
- if (LOG.isDebugEnabled()) {
- for (Priority priority : getPriorities()) {
- Map<String, ResourceRequest> requests = getResourceRequests(priority);
- if (requests != null) {
- LOG.debug("showRequests:" + " application=" + getApplicationId() +
- " headRoom=" + getHeadroom() +
- " currentConsumption=" + currentConsumption.getMemory());
- for (ResourceRequest request : requests.values()) {
- LOG.debug("showRequests:" + " application=" + getApplicationId()
- + " request=" + request);
- }
- }
- }
- }
- }
-
- public Resource getCurrentConsumption() {
- return currentConsumption;
- }
-
- public synchronized List<Container> pullNewlyAllocatedContainers() {
- List<Container> returnContainerList = new ArrayList<Container>(
- newlyAllocatedContainers.size());
- for (RMContainer rmContainer : newlyAllocatedContainers) {
- rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
- RMContainerEventType.ACQUIRED));
- returnContainerList.add(rmContainer.getContainer());
- }
- newlyAllocatedContainers.clear();
- return returnContainerList;
- }
- public synchronized void updateBlacklist(
- List<String> blacklistAdditions, List<String> blacklistRemovals) {
- if (!isStopped) {
- this.appSchedulingInfo.updateBlacklist(
- blacklistAdditions, blacklistRemovals);
- }
- }
-
- public boolean isBlacklisted(String resourceName) {
- return this.appSchedulingInfo.isBlacklisted(resourceName);
- }
-
- public synchronized void addSchedulingOpportunity(Priority priority) {
- schedulingOpportunities.setCount(priority,
- schedulingOpportunities.count(priority) + 1);
- }
-
- public synchronized void subtractSchedulingOpportunity(Priority priority) {
- int count = schedulingOpportunities.count(priority) - 1;
- this.schedulingOpportunities.setCount(priority, Math.max(count, 0));
- }
-
- /**
- * Return the number of times the application has been given an opportunity
- * to schedule a task at the given priority since the last time it
- * successfully did so.
- */
- public synchronized int getSchedulingOpportunities(Priority priority) {
- return schedulingOpportunities.count(priority);
- }
-
- /**
- * Should be called when an application has successfully scheduled a container,
- * or when the scheduling locality threshold is relaxed.
- * Reset various internal counters which affect delay scheduling
- *
- * @param priority The priority of the container scheduled.
- */
- public synchronized void resetSchedulingOpportunities(Priority priority) {
- resetSchedulingOpportunities(priority, System.currentTimeMillis());
- }
- // used for continuous scheduling
- public synchronized void resetSchedulingOpportunities(Priority priority,
- long currentTimeMs) {
- lastScheduledContainer.put(priority, currentTimeMs);
- schedulingOpportunities.setCount(priority, 0);
- }
-
- public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
- return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
- reservedContainers.size(), Resources.clone(currentConsumption),
- Resources.clone(currentReservation),
- Resources.add(currentConsumption, currentReservation));
+ public String getUser() {
+ return user;
}
-
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Thu Jan 2 20:21:03 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -35,7 +36,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -155,21 +155,32 @@ extends org.apache.hadoop.yarn.server.re
/**
* Submit a new application to the queue.
- * @param application application being submitted
+ * @param applicationId the applicationId of the application being submitted
* @param user user who submitted the application
* @param queue queue to which the application is submitted
*/
- public void submitApplication(FiCaSchedulerApp application, String user,
- String queue)
- throws AccessControlException;
-
+ public void submitApplication(ApplicationId applicationId, String user,
+ String queue) throws AccessControlException;
+
+ /**
+ * Submit an application attempt to the queue.
+ */
+ public void submitApplicationAttempt(FiCaSchedulerApp application,
+ String userName);
+
/**
* An application submitted to this queue has finished.
- * @param application
- * @param queue application queue
+ * @param applicationId
+ * @param user user who submitted the application
*/
- public void finishApplication(FiCaSchedulerApp application, String queue);
-
+ public void finishApplication(ApplicationId applicationId, String user);
+
+ /**
+ * An application attempt submitted to this queue has finished.
+ */
+ public void finishApplicationAttempt(FiCaSchedulerApp application,
+ String queue);
+
/**
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Jan 2 20:21:03 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,10 +54,13 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -65,14 +69,16 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -185,7 +191,11 @@ public class CapacityScheduler
private Resource maximumAllocation;
@VisibleForTesting
- protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications =
+ protected Map<ApplicationId, SchedulerApplication> applications =
+ new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+
+ @VisibleForTesting
+ protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts =
new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
private boolean initialized = false;
@@ -415,61 +425,84 @@ public class CapacityScheduler
synchronized CSQueue getQueue(String queueName) {
return queues.get(queueName);
}
-
- private synchronized void
- addApplicationAttempt(ApplicationAttemptId applicationAttemptId,
- String queueName, String user) {
- // Sanity checks
+ private synchronized void addApplication(ApplicationId applicationId,
+ String queueName, String user) {
+ // santiy checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
- String message = "Application " + applicationAttemptId +
+ String message = "Application " + applicationId +
" submitted by user " + user + " to unknown queue: " + queueName;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
return;
}
if (!(queue instanceof LeafQueue)) {
- String message = "Application " + applicationAttemptId +
+ String message = "Application " + applicationId +
" submitted by user " + user + " to non-leaf queue: " + queueName;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
return;
}
-
- // TODO: Fix store
- FiCaSchedulerApp SchedulerApp =
- new FiCaSchedulerApp(applicationAttemptId, user, queue,
- queue.getActiveUsersManager(), rmContext);
-
// Submit to the queue
try {
- queue.submitApplication(SchedulerApp, user, queueName);
+ queue.submitApplication(applicationId, user, queueName);
} catch (AccessControlException ace) {
- LOG.info("Failed to submit application " + applicationAttemptId +
- " to queue " + queueName + " from user " + user, ace);
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId,
- ace.toString()));
+ LOG.info("Failed to submit application " + applicationId + " to queue "
+ + queueName + " from user " + user, ace);
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
return;
}
+ SchedulerApplication application =
+ new SchedulerApplication(queue, user);
+ applications.put(applicationId, application);
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", in queue: " + queueName);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
- applications.put(applicationAttemptId, SchedulerApp);
-
- LOG.info("Application Submission: " + applicationAttemptId +
- ", user: " + user +
- " queue: " + queue +
- ", currently active: " + applications.size());
-
+ private synchronized void addApplicationAttempt(
+ ApplicationAttemptId applicationAttemptId) {
+ SchedulerApplication application =
+ applications.get(applicationAttemptId.getApplicationId());
+ CSQueue queue = (CSQueue) application.getQueue();
+
+ FiCaSchedulerApp SchedulerApp =
+ new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
+ queue, queue.getActiveUsersManager(), rmContext);
+ appAttempts.put(applicationAttemptId, SchedulerApp);
+ queue.submitApplicationAttempt(SchedulerApp, application.getUser());
+ LOG.info("Added Application Attempt " + applicationAttemptId
+ + " to scheduler from user " + application.getUser() + " in queue "
+ + queue.getQueueName());
rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.APP_ACCEPTED));
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
+ }
+
+ private synchronized void doneApplication(ApplicationId applicationId,
+ RMAppState finalState) {
+ SchedulerApplication application = applications.get(applicationId);
+ if (application == null){
+ // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps.
+ return;
+ }
+ CSQueue queue = (CSQueue) application.getQueue();
+ if (!(queue instanceof LeafQueue)) {
+ LOG.error("Cannot finish application " + "from non-leaf queue: "
+ + queue.getQueueName());
+ } else {
+ queue.finishApplication(applicationId, application.getUser());
+ }
+ applications.remove(applicationId);
}
private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState) {
- LOG.info("Application " + applicationAttemptId + " is done." +
+ LOG.info("Application Attempt " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
@@ -509,11 +542,11 @@ public class CapacityScheduler
LOG.error("Cannot finish application " + "from non-leaf queue: "
+ queueName);
} else {
- queue.finishApplication(application, queue.getQueueName());
+ queue.finishApplicationAttempt(application, queue.getQueueName());
}
// Remove from our data-structure
- applications.remove(applicationAttemptId);
+ appAttempts.remove(applicationAttemptId);
}
private static final Allocation EMPTY_ALLOCATION =
@@ -740,12 +773,25 @@ public class CapacityScheduler
nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
+ case APP_ADDED:
+ {
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+ addApplication(appAddedEvent.getApplicationId(),
+ appAddedEvent.getQueue(), appAddedEvent.getUser());
+ }
+ break;
+ case APP_REMOVED:
+ {
+ AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+ doneApplication(appRemovedEvent.getApplicationID(),
+ appRemovedEvent.getFinalState());
+ }
+ break;
case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
- addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getQueue(), appAttemptAddedEvent.getUser());
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
}
break;
case APP_ATTEMPT_REMOVED:
@@ -854,7 +900,7 @@ public class CapacityScheduler
@Lock(Lock.NoLock.class)
FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
- return applications.get(applicationAttemptId);
+ return appAttempts.get(applicationAttemptId);
}
@Override
@@ -912,7 +958,7 @@ public class CapacityScheduler
LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
" container: " + cont.toString());
}
- FiCaSchedulerApp app = applications.get(aid);
+ FiCaSchedulerApp app = appAttempts.get(aid);
if (app != null) {
app.addPreemptContainer(cont.getContainerId());
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Jan 2 20:21:03 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -59,7 +60,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -99,7 +99,7 @@ public class LeafQueue implements CSQueu
private volatile int numContainers;
Set<FiCaSchedulerApp> activeApplications;
- Map<ApplicationAttemptId, FiCaSchedulerApp> applicationsMap =
+ Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
Set<FiCaSchedulerApp> pendingApplications;
@@ -635,7 +635,22 @@ public class LeafQueue implements CSQueu
}
@Override
- public void submitApplication(FiCaSchedulerApp application, String userName,
+ public void submitApplicationAttempt(FiCaSchedulerApp application,
+ String userName) {
+ // Careful! Locking order is important!
+ synchronized (this) {
+ User user = getUser(userName);
+ // Add the attempt to our data-structures
+ addApplicationAttempt(application, user);
+ }
+
+ int attemptId = application.getApplicationAttemptId().getAttemptId();
+ metrics.submitApp(userName, attemptId);
+ getParent().submitApplicationAttempt(application, userName);
+ }
+
+ @Override
+ public void submitApplication(ApplicationId applicationId, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
@@ -653,8 +668,7 @@ public class LeafQueue implements CSQueu
// Check if the queue is accepting jobs
if (getState() != QueueState.RUNNING) {
String msg = "Queue " + getQueuePath() +
- " is STOPPED. Cannot accept submission of application: " +
- application.getApplicationId();
+ " is STOPPED. Cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
@@ -663,8 +677,7 @@ public class LeafQueue implements CSQueu
if (getNumApplications() >= getMaxApplications()) {
String msg = "Queue " + getQueuePath() +
" already has " + getNumApplications() + " applications," +
- " cannot accept submission of application: " +
- application.getApplicationId();
+ " cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
@@ -675,26 +688,18 @@ public class LeafQueue implements CSQueu
String msg = "Queue " + getQueuePath() +
" already has " + user.getTotalApplications() +
" applications from user " + userName +
- " cannot accept submission of application: " +
- application.getApplicationId();
+ " cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
-
- // Add the application to our data-structures
- addApplication(application, user);
}
- int attemptId = application.getApplicationAttemptId().getAttemptId();
- metrics.submitApp(userName, attemptId);
-
// Inform the parent queue
try {
- getParent().submitApplication(application, userName, queue);
+ getParent().submitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
- removeApplication(application, user);
throw ace;
}
}
@@ -722,11 +727,11 @@ public class LeafQueue implements CSQueu
}
}
- private synchronized void addApplication(FiCaSchedulerApp application, User user) {
+ private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) {
// Accept
user.submitApplication();
pendingApplications.add(application);
- applicationsMap.put(application.getApplicationAttemptId(), application);
+ applicationAttemptMap.put(application.getApplicationAttemptId(), application);
// Activate applications
activateApplications();
@@ -742,22 +747,28 @@ public class LeafQueue implements CSQueu
}
@Override
- public void finishApplication(FiCaSchedulerApp application, String queue) {
+ public void finishApplication(ApplicationId application, String user) {
+ // Inform the activeUsersManager
+ activeUsersManager.deactivateApplication(user, application);
+ // Inform the parent queue
+ getParent().finishApplication(application, user);
+ }
+
+ @Override
+ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
// Careful! Locking order is important!
synchronized (this) {
- removeApplication(application, getUser(application.getUser()));
+ removeApplicationAttempt(application, getUser(application.getUser()));
}
-
- // Inform the parent queue
- getParent().finishApplication(application, queue);
+ getParent().finishApplicationAttempt(application, queue);
}
- public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
+ public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) {
boolean wasActive = activeApplications.remove(application);
if (!wasActive) {
pendingApplications.remove(application);
}
- applicationsMap.remove(application.getApplicationAttemptId());
+ applicationAttemptMap.remove(application.getApplicationAttemptId());
user.finishApplication(wasActive);
if (user.getTotalApplications() == 0) {
@@ -766,13 +777,7 @@ public class LeafQueue implements CSQueu
// Check if we can activate more applications
activateApplications();
-
- // Inform the activeUsersManager
- synchronized (application) {
- activeUsersManager.deactivateApplication(
- application.getUser(), application.getApplicationId());
- }
-
+
LOG.info("Application removed -" +
" appId: " + application.getApplicationId() +
" user: " + application.getUser() +
@@ -783,10 +788,10 @@ public class LeafQueue implements CSQueu
" #queue-active-applications: " + getNumActiveApplications()
);
}
-
+
private synchronized FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
- return applicationsMap.get(applicationAttemptId);
+ return applicationAttemptMap.get(applicationAttemptId);
}
private static final CSAssignment NULL_ASSIGNMENT =
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Jan 2 20:21:03 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -51,7 +52,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -442,7 +442,7 @@ public class ParentQueue implements CSQu
}
@Override
- public void submitApplication(FiCaSchedulerApp application, String user,
+ public void submitApplication(ApplicationId applicationId, String user,
String queue) throws AccessControlException {
synchronized (this) {
@@ -455,57 +455,70 @@ public class ParentQueue implements CSQu
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath() +
" is STOPPED. Cannot accept submission of application: " +
- application.getApplicationId());
+ applicationId);
}
- addApplication(application, user);
+ addApplication(applicationId, user);
}
// Inform the parent queue
if (parent != null) {
try {
- parent.submitApplication(application, user, queue);
+ parent.submitApplication(applicationId, user, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
- removeApplication(application, user);
+ removeApplication(applicationId, user);
throw ace;
}
}
}
- private synchronized void addApplication(FiCaSchedulerApp application,
+
+ @Override
+ public void submitApplicationAttempt(FiCaSchedulerApp application,
+ String userName) {
+ // submit attempt logic.
+ }
+
+ @Override
+ public void finishApplicationAttempt(FiCaSchedulerApp application,
+ String queue) {
+ // finish attempt logic.
+ }
+
+ private synchronized void addApplication(ApplicationId applicationId,
String user) {
-
+
++numApplications;
LOG.info("Application added -" +
- " appId: " + application.getApplicationId() +
+ " appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
}
@Override
- public void finishApplication(FiCaSchedulerApp application, String queue) {
+ public void finishApplication(ApplicationId application, String user) {
synchronized (this) {
- removeApplication(application, application.getUser());
+ removeApplication(application, user);
}
// Inform the parent queue
if (parent != null) {
- parent.finishApplication(application, queue);
+ parent.finishApplication(application, user);
}
}
- public synchronized void removeApplication(FiCaSchedulerApp application,
+ public synchronized void removeApplication(ApplicationId applicationId,
String user) {
--numApplications;
LOG.info("Application removed -" +
- " appId: " + application.getApplicationId() +
+ " appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Thu Jan 2 20:21:03 2014
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.util.resou
*/
@Private
@Unstable
-public class FiCaSchedulerApp extends SchedulerApplication {
+public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Thu Jan 2 20:21:03 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -206,7 +206,7 @@ public class FiCaSchedulerNode extends S
}
public synchronized void reserveResource(
- SchedulerApplication application, Priority priority,
+ SchedulerApplicationAttempt application, Priority priority,
RMContainer reservedContainer) {
// Check if it's already reserved
if (this.reservedContainer != null) {
@@ -241,7 +241,7 @@ public class FiCaSchedulerNode extends S
}
public synchronized void unreserveResource(
- SchedulerApplication application) {
+ SchedulerApplicationAttempt application) {
// adding NP checks as this can now be called for preemption
if (reservedContainer != null
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Thu Jan 2 20:21:03 2014
@@ -23,27 +23,14 @@ import org.apache.hadoop.yarn.api.record
public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
private final ApplicationAttemptId applicationAttemptId;
- private final String queue;
- private final String user;
public AppAttemptAddedSchedulerEvent(
- ApplicationAttemptId applicationAttemptId, String queue, String user) {
+ ApplicationAttemptId applicationAttemptId) {
super(SchedulerEventType.APP_ATTEMPT_ADDED);
this.applicationAttemptId = applicationAttemptId;
- this.queue = queue;
- this.user = user;
}
public ApplicationAttemptId getApplicationAttemptId() {
return applicationAttemptId;
}
-
- public String getQueue() {
- return queue;
- }
-
- public String getUser() {
- return user;
- }
-
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java Thu Jan 2 20:21:03 2014
@@ -24,7 +24,11 @@ public enum SchedulerEventType {
NODE_ADDED,
NODE_REMOVED,
NODE_UPDATE,
-
+
+ // Source: RMApp
+ APP_ADDED,
+ APP_REMOVED,
+
// Source: RMAppAttempt
APP_ATTEMPT_ADDED,
APP_ATTEMPT_REMOVED,
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1554898&r1=1554897&r2=1554898&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Thu Jan 2 20:21:03 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@Private
@Unstable