You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/03/24 08:52:36 UTC
svn commit: r1084866 [1/2] - in /hadoop/mapreduce/branches/MR-279/yarn:
yarn-api/src/main/avro/
yarn-common/src/main/java/org/apache/hadoop/yarn/conf/
yarn-server/yarn-server-common/src/main/resources/
yarn-server/yarn-server-resourcemanager/src/main/j...
Author: mahadev
Date: Thu Mar 24 07:52:35 2011
New Revision: 1084866
URL: http://svn.apache.org/viewvc?rev=1084866&view=rev
Log:
Restart/Cleanup an application on an AM failure for a configurable number of times contributed by Mahadev Konar
Added:
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
Removed:
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTracker.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterTracker.java
Modified:
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro
hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro Thu Mar 24 07:52:35 2011
@@ -48,6 +48,7 @@ protocol types {
PENDING,
ALLOCATING,
ALLOCATED,
+ EXPIRED_PENDING,
LAUNCHING,
LAUNCHED,
RUNNING,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Thu Mar 24 07:52:35 2011
@@ -31,6 +31,11 @@ public class YarnConfiguration extends C
public static final String AM_EXPIRY_INTERVAL = RM_PREFIX
+ "application.expiry.interval";
+ public static final String AM_MAX_RETRIES = RM_PREFIX
+ + "application.max.retries";
+
+ public static final int DEFAULT_AM_MAX_RETRIES = 3;
+
public static final long DEFAULT_AM_EXPIRY_INTERVAL = 60000L;
public static final String NM_EXPIRY_INTERVAL = RM_PREFIX
@@ -92,7 +97,7 @@ public class YarnConfiguration extends C
"AppClientTokenEnv";
public static final String RESOURCE_SCHEDULER = RM_PREFIX + "scheduler";
-
+
public YarnConfiguration() {
super();
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml Thu Mar 24 07:52:35 2011
@@ -30,6 +30,12 @@
</property>
<property>
+ <name>yarn.server.resourcemanager.application.max.retries</name>
+ <value>1</value>
+ <description>The number of times an application will be retried in case
+ of AM failure.</description>
+ </property>
+ <property>
<name>yarn.server.resourcemanager.keytab</name>
<value>/etc/krb5.keytab</value>
</property>
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Thu Mar 24 07:52:35 2011
@@ -29,18 +29,18 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.ClientRMProtocol;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTracker;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ResourceContext;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.ClientRMProtocol;
-import org.apache.hadoop.yarn.YarnClusterMetrics;
/**
* The client interface to the Resource Manager. This module handles all the rpc
@@ -48,14 +48,14 @@ import org.apache.hadoop.yarn.YarnCluste
*/
public class ClientRMService extends AbstractService implements ClientRMProtocol {
private static final Log LOG = LogFactory.getLog(ClientRMService.class);
- private RMResourceTracker clusterInfo;
+ private ResourceContext clusterInfo;
private ApplicationsManager applicationsManager;
private String clientServiceBindAddress;
private Server server;
InetSocketAddress clientBindAddress;
public ClientRMService(ApplicationsManager applicationsManager,
- RMResourceTracker clusterInfo) {
+ ResourceContext clusterInfo) {
super(ClientRMService.class.getName());
this.clusterInfo = clusterInfo;
this.applicationsManager = applicationsManager;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu Mar 24 07:52:35 2011
@@ -32,7 +32,9 @@ import org.apache.hadoop.yarn.security.A
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.SyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
@@ -101,7 +103,8 @@ public class ResourceManager extends Com
FifoScheduler.class, ResourceScheduler.class),
this.conf);
this.scheduler.reinitialize(this.conf, this.containerTokenSecretManager);
-
+ /* add the scheduler to be notified of events from the applications managers */
+ this.asmContext.getDispatcher().register(ApplicationTrackerEventType.class, this.scheduler);
//TODO change this to be random
this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
.createSecretKey("Dummy".getBytes()));
@@ -109,8 +112,7 @@ public class ResourceManager extends Com
applicationsManager = createApplicationsManagerImpl();
addService(applicationsManager);
- rmResourceTracker = createRMResourceTracker();
- rmResourceTracker.register(this.scheduler);
+ rmResourceTracker = createRMResourceTracker(this.scheduler);
addService(rmResourceTracker);
clientRM = createClientRMService();
@@ -165,8 +167,8 @@ public class ResourceManager extends Com
super.stop();
}
- protected RMResourceTrackerImpl createRMResourceTracker() {
- return new RMResourceTrackerImpl(this.containerTokenSecretManager);
+ protected RMResourceTrackerImpl createRMResourceTracker(ResourceListener listener) {
+ return new RMResourceTrackerImpl(this.containerTokenSecretManager, listener);
}
protected ApplicationsManagerImpl createApplicationsManagerImpl() {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Thu Mar 24 07:52:35 2011
@@ -59,7 +59,9 @@ public class AMTracker extends AbstractS
private long amExpiryInterval;
@SuppressWarnings("rawtypes")
private EventHandler handler;
-
+
+ private int amMaxRetries;
+
private final ASMContext asmContext;
private final Map<ApplicationID, ApplicationMasterInfo> applications =
@@ -93,6 +95,10 @@ public class AMTracker extends AbstractS
this.handler = asmContext.getDispatcher().getEventHandler();
this.amExpiryInterval = conf.getLong(YarnConfiguration.AM_EXPIRY_INTERVAL,
YarnConfiguration.DEFAULT_AM_EXPIRY_INTERVAL);
+ LOG.info("AM expiry interval: " + this.amExpiryInterval);
+ this.amMaxRetries = conf.getInt(YarnConfiguration.AM_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_AM_MAX_RETRIES);
+ LOG.info("AM max retries: " + this.amMaxRetries);
this.asmContext.getDispatcher().register(ApplicationEventType.class, this);
}
@@ -161,7 +167,6 @@ public class AMTracker extends AbstractS
synchronized (applications) {
am = applications.get(app);
}
-
handler.handle(new ASMEvent<ApplicationEventType>
(ApplicationEventType.EXPIRE, am));
}
@@ -208,6 +213,8 @@ public class AMTracker extends AbstractS
return masterInfo;
}
+ /* As of now we dont remove applications from the RM */
+ /* TODO we need to decide on a strategy for expiring done applications */
public void remove(ApplicationID applicationId) {
synchronized (applications) {
applications.remove(applicationId);
@@ -297,6 +304,11 @@ public class AMTracker extends AbstractS
public String getQueue() {
throw notimplemented;
}
+
+ @Override
+ public int getFailedCount() {
+ throw notimplemented;
+ }
}
public void heartBeat(ApplicationStatus status) {
@@ -347,5 +359,17 @@ public class AMTracker extends AbstractS
amExpiryQueue.add(masterInfo.getStatus());
}
}
+
+ /* check to see if the AM is an EXPIRED_PENDING state and start off the cycle again */
+ if (masterInfo.getState() == ApplicationState.EXPIRED_PENDING) {
+ /* check to see if the number of retries are reached or not */
+ if (masterInfo.getFailedCount() < this.amMaxRetries) {
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATE,
+ masterInfo));
+ } else {
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
+ FAILED_MAX_RETRIES, masterInfo));
+ }
+ }
}
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java Thu Mar 24 07:52:35 2011
@@ -91,4 +91,10 @@ public interface AppContext {
* @return the queue for this application
*/
public String getQueue();
+
+ /**
+ * the count of number of times the AM has expired/failed.
+ * @return the count of number of times the AM has expired/failed.
+ */
+ public int getFailedCount();
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java Thu Mar 24 07:52:35 2011
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -55,12 +58,18 @@ public class ApplicationMasterInfo imple
private final EventHandler handler;
private Container masterContainer;
final private String user;
+ private int numFailed = 0;
+ /* the list of nodes that this AM was launched on */
+ List<String> hostNamesLaunched = new ArrayList<String>();
/* this transition is too generalized, needs to be broken up as and when we
* keeping adding states. This will keep evolving and is not final yet.
*/
private final KillTransition killTransition = new KillTransition();
private final StatusUpdateTransition statusUpdatetransition = new StatusUpdateTransition();
-
+ private final ExpireTransition expireTransition = new ExpireTransition();
+ private final FailedTransition failedTransition = new FailedTransition();
+ private final AllocateTransition allocateTransition = new AllocateTransition();
+
private final StateMachine<ApplicationState, ApplicationEventType,
ASMEvent<ApplicationEventType>> stateMachine;
@@ -72,8 +81,11 @@ public class ApplicationMasterInfo imple
(ApplicationState.PENDING)
.addTransition(ApplicationState.PENDING, ApplicationState.ALLOCATING,
- ApplicationEventType.ALLOCATE, new AllocateTransition())
-
+ ApplicationEventType.ALLOCATE, allocateTransition)
+
+ .addTransition(ApplicationState.EXPIRED_PENDING, ApplicationState.ALLOCATING,
+ ApplicationEventType.ALLOCATE, allocateTransition)
+
.addTransition(ApplicationState.PENDING, ApplicationState.CLEANUP,
ApplicationEventType.KILL, killTransition)
@@ -99,14 +111,23 @@ public class ApplicationMasterInfo imple
ApplicationEventType.KILL, killTransition)
.addTransition(ApplicationState.LAUNCHED, ApplicationState.FAILED,
- ApplicationEventType.EXPIRE, new ExpireTransition())
+ ApplicationEventType.EXPIRE, expireTransition)
.addTransition(ApplicationState.LAUNCHED, ApplicationState.RUNNING,
ApplicationEventType.REGISTERED, new RegisterTransition())
-
- .addTransition(ApplicationState.RUNNING, ApplicationState.FAILED,
- ApplicationEventType.EXPIRE, new ExpireTransition())
+ /* for now we assume that acting on expiry is synchronous and we do not
+ * have to wait for cleanup acks from scheduler negotiator and launcher.
+ */
+ .addTransition(ApplicationState.LAUNCHED, ApplicationState.EXPIRED_PENDING,
+ ApplicationEventType.EXPIRE, expireTransition)
+
+ .addTransition(ApplicationState.RUNNING, ApplicationState.EXPIRED_PENDING,
+ ApplicationEventType.EXPIRE, expireTransition)
+
+ .addTransition(ApplicationState.EXPIRED_PENDING, ApplicationState.FAILED,
+ ApplicationEventType.FAILED_MAX_RETRIES, failedTransition)
+
.addTransition(ApplicationState.RUNNING, ApplicationState.COMPLETED,
ApplicationEventType.FINISH, new DoneTransition())
@@ -186,6 +207,11 @@ public class ApplicationMasterInfo imple
}
@Override
+ public synchronized int getFailedCount() {
+ return numFailed;
+ }
+
+ @Override
public String getName() {
return submissionContext.applicationName.toString();
}
@@ -218,6 +244,8 @@ public class ApplicationMasterInfo imple
ASMEvent<ApplicationEventType> event) {
masterInfo.handler.handle(new ASMEvent<SNEventType>(SNEventType.CLEANUP, masterInfo));
masterInfo.handler.handle(new ASMEvent<AMLauncherEventType>(AMLauncherEventType.CLEANUP, masterInfo));
+ masterInfo.handler.handle(new ASMEvent<ApplicationTrackerEventType>(ApplicationTrackerEventType.REMOVE,
+ masterInfo));
}
}
@@ -253,6 +281,7 @@ public class ApplicationMasterInfo imple
AMLauncherEventType.CLEANUP, masterInfo));
masterInfo.handler.handle(new ASMEvent<ApplicationTrackerEventType>(
ApplicationTrackerEventType.REMOVE, masterInfo));
+ masterInfo.numFailed++;
}
}
@@ -302,16 +331,13 @@ public class ApplicationMasterInfo imple
/* transition to finishing state on a cleanup, for now its not used, but will need it
* later */
- private static class FinishTransition implements
+ private static class FailedTransition implements
SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
@Override
public void transition(ApplicationMasterInfo masterInfo,
ASMEvent<ApplicationEventType> event) {
- masterInfo.handler.handle(new ASMEvent<SNEventType>(
- SNEventType.CLEANUP, masterInfo));
- masterInfo.handler.handle(new ASMEvent<AMLauncherEventType>(
- AMLauncherEventType.CLEANUP, masterInfo));
+ LOG.info("Failed application: " + masterInfo.getApplicationID());
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java Thu Mar 24 07:52:35 2011
@@ -178,9 +178,6 @@ public class ApplicationsManagerImpl ext
amTracker.addMaster(user, context, clientTokenStr);
// TODO this should happen via dispatcher. should move it out to scheudler
// negotiator.
- scheduler.addApplication(applicationId, user,
- (context.queue == null? "default" : context.queue.toString()),
- context.priority);
/* schedule */
LOG.info("Application with id " + applicationId.id + " submitted by user " +
user + " with " + context);
@@ -197,7 +194,6 @@ public class ApplicationsManagerImpl ext
throws IOException {
/* remove the applicaiton from the scheduler for now. Later scheduler should
* be a event handler of adding and cleaning up appications*/
- scheduler.removeApplication(applicationId);
amTracker.kill(applicationId);
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java Thu Mar 24 07:52:35 2011
@@ -282,5 +282,10 @@ class SchedulerNegotiator extends Abstra
public String getQueue() {
throw notImplementedException;
}
+
+ @Override
+ public int getFailedCount() {
+ throw notImplementedException;
+ }
}
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java Thu Mar 24 07:52:35 2011
@@ -52,6 +52,7 @@ public class ApplicationMasterEvents {
LAUNCH,
LAUNCHED,
FAILED,
+ FAILED_MAX_RETRIES,
ALLOCATED,
CLEANUP,
FINISH,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Thu Mar 24 07:52:35 2011
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
-import javax.xml.crypto.NodeSetData;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.ipc.Server;
@@ -43,14 +42,6 @@ import org.apache.hadoop.net.NetworkTopo
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
-import org.apache.hadoop.yarn.server.YarnServerConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
-import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
-import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.HeartbeatResponse;
import org.apache.hadoop.yarn.NodeID;
import org.apache.hadoop.yarn.NodeStatus;
@@ -58,6 +49,14 @@ import org.apache.hadoop.yarn.Registrati
import org.apache.hadoop.yarn.Resource;
import org.apache.hadoop.yarn.ResourceTracker;
import org.apache.hadoop.yarn.YarnClusterMetrics;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.AbstractService;
/**
* This class is responsible for the interaction with the NodeManagers.
@@ -65,7 +64,7 @@ import org.apache.hadoop.yarn.YarnCluste
*`
*/
public class RMResourceTrackerImpl extends AbstractService implements
-ResourceTracker, RMResourceTracker, ResourceContext {
+ResourceTracker, ResourceContext {
private static final Log LOG = LogFactory.getLog(RMResourceTrackerImpl.class);
/* we dont garbage collect on nodes. A node can come back up again and re register,
* so no use garbage collecting. Though admin can break the RM by bouncing
@@ -99,11 +98,13 @@ ResourceTracker, RMResourceTracker, Reso
private static final HeartbeatResponse reboot = new HeartbeatResponse();
private long nmExpiryInterval;
- public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager) {
+ public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager,
+ ResourceListener listener) {
super(RMResourceTrackerImpl.class.getName());
reboot.reboot = true;
this.containerTokenSecretManager = containerTokenSecretManager;
this.heartbeatThread = new HeartBeatThread();
+ this.resourceListener = listener;
}
@Override
@@ -253,19 +254,6 @@ ResourceTracker, RMResourceTracker, Reso
}
@Override
- public synchronized void register(ResourceListener listener) {
- //for now there is only one resource listener, so we dont
- //really add it to a list.
- this.resourceListener = listener;
- }
-
- @Override
- public synchronized void unregister(ResourceListener listener) {
- //TODO make the listener so that it dumps to a void listener
- //rather than nullifying it.
- }
-
- @Override
public List<NodeInfo> getAllNodeInfo() {
List<NodeInfo> infoList = new ArrayList<NodeInfo>();
synchronized (nodeManagers) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java Thu Mar 24 07:52:35 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.re
import java.util.List;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+
/**
* The read-only interface for cluster resource
*/
@@ -29,4 +31,10 @@ public interface ResourceContext {
* @return a list of node info
*/
List<NodeInfo> getAllNodeInfo();
+
+ /**
+ * Get cluster metrics from the resource tracker.
+ * @return the cluster metrics for the cluster.
+ */
+ YarnClusterMetrics getClusterMetrics();
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java Thu Mar 24 07:52:35 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
import org.apache.hadoop.yarn.ApplicationID;
import org.apache.hadoop.yarn.Container;
import org.apache.hadoop.yarn.ContainerID;
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java?rev=1084866&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java Thu Mar 24 07:52:35 2011
@@ -0,0 +1,32 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+
+/**
+ * The class that encapsulates response from clusterinfo for
+ * updates from the node managers.
+ */
+public class NodeResponse {
+ private final List<Container> completed;
+ private final List<Container> toCleanUp;
+ private final List<ApplicationID> finishedApplications;
+
+ public NodeResponse(List<ApplicationID> finishedApplications,
+ List<Container> completed, List<Container> toKill) {
+ this.finishedApplications = finishedApplications;
+ this.completed = completed;
+ this.toCleanUp = toKill;
+ }
+ public List<ApplicationID> getFinishedApplications() {
+ return this.finishedApplications;
+ }
+ public List<Container> getCompletedContainers() {
+ return this.completed;
+ }
+ public List<Container> getContainersToCleanUp() {
+ return this.toCleanUp;
+ }
+}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Thu Mar 24 07:52:35 2011
@@ -25,7 +25,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
import org.apache.hadoop.yarn.Container;
import org.apache.hadoop.yarn.NodeID;
import org.apache.hadoop.yarn.Resource;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Thu Mar 24 07:52:35 2011
@@ -23,6 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
/**
@@ -32,7 +35,8 @@ import org.apache.hadoop.yarn.server.sec
*/
@LimitedPrivate("yarn")
@Evolving
-public interface ResourceScheduler extends ResourceListener, YarnScheduler {
+public interface ResourceScheduler extends ResourceListener, YarnScheduler,
+ EventHandler<ASMEvent<ApplicationTrackerEventType>> {
/**
* Re-initialize the <code>ResourceScheduler</code>.
* @param conf configuration
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Thu Mar 24 07:52:35 2011
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.hadoop.yarn.ApplicationID;
import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.Priority;
import org.apache.hadoop.yarn.ResourceRequest;
/**
@@ -43,21 +42,4 @@ public interface YarnScheduler {
List<Container> allocate(ApplicationID applicationId,
List<ResourceRequest> ask, List<Container> release)
throws IOException;
- /**
- * A new application has been submitted to the ResourceManager
- * @param applicationId application which has been submitted
- * @param user application user
- * @param queue queue to which the applications is being submitted
- * @param priority application priority
- */
- public void addApplication(ApplicationID applicationId, String user,
- String queue, Priority priority)
- throws IOException;
-
- /**
- * A submitted application has completed.
- * @param applicationId completed application
- */
- public void removeApplication(ApplicationID applicationId)
- throws IOException;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Mar 24 07:52:35 2011
@@ -1,26 +1,27 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -34,19 +35,20 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTrackerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
-import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.ApplicationID;
import org.apache.hadoop.yarn.Container;
import org.apache.hadoop.yarn.NodeID;
import org.apache.hadoop.yarn.Priority;
import org.apache.hadoop.yarn.Resource;
import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@LimitedPrivate("yarn")
@Evolving
@@ -54,9 +56,9 @@ public class CapacityScheduler
implements ResourceScheduler, CapacitySchedulerContext {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
-
+
private Queue root;
-
+
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
@@ -68,39 +70,31 @@ implements ResourceScheduler, CapacitySc
} else if (q1.getUtilization() > q2.getUtilization()) {
return 1;
}
-
+
return q1.getQueuePath().compareTo(q2.getQueuePath());
}
};
private final Comparator<Application> applicationComparator =
new Comparator<Application>() {
- @Override
- public int compare(Application a1, Application a2) {
- return a1.getApplicationId().id - a2.getApplicationId().id;
- }
+ @Override
+ public int compare(Application a1, Application a2) {
+ return a1.getApplicationId().id - a2.getApplicationId().id;
+ }
};
-
+
private CapacitySchedulerConfiguration conf;
private ContainerTokenSecretManager containerTokenSecretManager;
-
+
private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
-
- private final ClusterTracker clusterTracker;
-
+
+
private Resource minimumAllocation;
-
+
private Map<ApplicationID, Application> applications =
new TreeMap<ApplicationID, Application>(
new org.apache.hadoop.yarn.server.resourcemanager.resource.ApplicationID.Comparator());
- public CapacityScheduler() {
- this.clusterTracker = createClusterTracker();
- }
-
- protected ClusterTracker createClusterTracker() {
- return new ClusterTrackerImpl();
- }
public Queue getRootQueue() {
return root;
@@ -110,7 +104,7 @@ implements ResourceScheduler, CapacitySc
public CapacitySchedulerConfiguration getConfiguration() {
return conf;
}
-
+
@Override
public ContainerTokenSecretManager getContainerTokenSecretManager() {
return containerTokenSecretManager;
@@ -127,22 +121,22 @@ implements ResourceScheduler, CapacitySc
this.conf = new CapacitySchedulerConfiguration(conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
this.containerTokenSecretManager = containerTokenSecretManager;
-
+
initializeQueues(this.conf);
}
@Private
public static final String ROOT = "root";
-
+
@Private
public static final String ROOT_QUEUE =
CapacitySchedulerConfiguration.PREFIX + ROOT;
-
+
private void initializeQueues(CapacitySchedulerConfiguration conf) {
root = parseQueue(conf, null, ROOT);
LOG.info("Initialized root queue " + root);
}
-
+
private Queue parseQueue(CapacitySchedulerConfiguration conf,
Queue parent, String queueName) {
Queue queue;
@@ -162,29 +156,37 @@ implements ResourceScheduler, CapacitySc
parentQueue,
childQueueName);
childQueues.add(childQueue);
-
+
queues.put(childQueueName, childQueue);
}
parentQueue.setChildQueues(childQueues);
-
+
queue = parentQueue;
}
-
+
LOG.info("Initialized queue: " + queue);
return queue;
}
- @Override
+ /**
+ * Add an application to the capacity scheduler. This application needs to be
+ * tracked.
+ * @param applicationId the application id of this application
+ * @param user the user who owns the application
+ * @param queueName the queue which the application belongs to
+ * @param priority the priority of the application
+ * @throws IOException
+ */
public void addApplication(ApplicationID applicationId,
String user, String queueName, Priority priority)
throws IOException {
Queue queue = queues.get(queueName);
-
+
if (queue == null) {
throw new IOException("Application " + applicationId +
" submitted by user " + user + " to unknown queue: " + queueName);
}
-
+
if (!(queue instanceof LeafQueue)) {
throw new IOException("Application " + applicationId +
" submitted by user " + user + " to non-leaf queue: " + queueName);
@@ -196,7 +198,7 @@ implements ResourceScheduler, CapacitySc
} catch (AccessControlException ace) {
throw new IOException(ace);
}
-
+
applications.put(applicationId, application);
LOG.info("Application Submission: " + applicationId.id +
@@ -205,14 +207,19 @@ implements ResourceScheduler, CapacitySc
", currently active: " + applications.size());
}
- @Override
+ /**
+ * Remove an application. Releases the resources of the application and
+ * then makes sure its removed from data structures of the scheduler.
+ * @param applicationId the applicationId of the application
+ * @throws IOException
+ */
public void removeApplication(ApplicationID applicationId)
- throws IOException {
+ throws IOException {
Application application = getApplication(applicationId);
-
+
if (application == null) {
-// throw new IOException("Unknown application " + applicationId +
-// " has completed!");
+ // throw new IOException("Unknown application " + applicationId +
+ // " has completed!");
LOG.info("Unknown application " + applicationId + " has completed!");
return;
}
@@ -221,30 +228,19 @@ implements ResourceScheduler, CapacitySc
Queue queue = queues.get(application.getQueue().getQueueName());
LOG.info("DEBUG --- removeApplication - appId: " + applicationId + " queue: " + queue);
queue.finishApplication(application, queue.getQueueName());
-
+
// Release containers and update queue capacities
processReleasedContainers(application, application.getCurrentContainers());
-
+
// Inform all NodeManagers about completion of application
- clusterTracker.finishedApplication(applicationId,
+ finishedApplication(applicationId,
application.getAllNodesForApplication());
-
+
// Remove from our data-structure
applications.remove(applicationId);
}
@Override
- public NodeInfo addNode(NodeID nodeId,String hostName,
- Node node, Resource capability) {
- return clusterTracker.addNode(nodeId, hostName, node, capability);
- }
-
- @Override
- public void removeNode(NodeInfo node) {
- clusterTracker.removeNode(node);
- }
-
- @Override
public List<Container> allocate(ApplicationID applicationId,
List<ResourceRequest> ask, List<Container> release)
throws IOException {
@@ -264,10 +260,10 @@ implements ResourceScheduler, CapacitySc
// Update application requests
application.updateResourceRequests(ask);
-
+
// Release ununsed containers and update queue capacities
processReleasedContainers(application, release);
-
+
LOG.info("DEBUG --- allocate: post-update");
application.showRequests();
@@ -285,29 +281,29 @@ implements ResourceScheduler, CapacitySc
normalizeRequest(ask);
}
}
-
+
private void normalizeRequest(ResourceRequest ask) {
int memory = ask.capability.memory;
int minMemory = minimumAllocation.memory;
ask.capability.memory =
minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0));
}
-
+
@Override
public synchronized NodeResponse nodeUpdate(NodeInfo node,
Map<CharSequence,List<Container>> containers ) {
-
+
LOG.info("nodeUpdate: " + node);
-
- NodeResponse nodeResponse = clusterTracker.nodeUpdate(node, containers);
+
+ NodeResponse nodeResponse = nodeUpdateInternal(node, containers);
// Completed containers
processCompletedContainers(nodeResponse.getCompletedContainers());
-
+ NodeManager nm = nodes.get(node.getHostName());
// Assign new containers
- root.assignContainers(clusterTracker, node);
-
+ root.assignContainers(clusterResource, nm);
+
return nodeResponse;
}
@@ -322,11 +318,11 @@ implements ResourceScheduler, CapacitySc
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
- queue.completedContainer(clusterTracker, container, application);
+ queue.completedContainer(clusterResource, container, application);
}
}
}
-
+
private synchronized void processReleasedContainers(Application application,
List<Container> releasedContainers) {
// Inform the application
@@ -335,7 +331,7 @@ implements ResourceScheduler, CapacitySc
// Inform clusterTracker
List<Container> unusedContainers = new ArrayList<Container>();
for (Container container : releasedContainers) {
- if (clusterTracker.releaseContainer(
+ if (releaseContainer(
application.getApplicationId(),
container)) {
unusedContainers.add(container);
@@ -345,9 +341,95 @@ implements ResourceScheduler, CapacitySc
// Update queue capacities
processCompletedContainers(unusedContainers);
}
-
+
private synchronized Application getApplication(ApplicationID applicationId) {
return applications.get(applicationId);
}
+ @Override
+ public synchronized void handle(ASMEvent<ApplicationTrackerEventType> event) {
+ switch(event.getType()) {
+ case ADD:
+ try {
+ addApplication(event.getAppContext().getApplicationID(),
+ event.getAppContext().getUser(), event.getAppContext().getQueue(),
+ event.getAppContext().getSubmissionContext().priority);
+ } catch(IOException ie) {
+ LOG.error("Error in adding an application to the scheduler", ie);
+ //TODO do proper error handling to shutdown the Resource Manager is we
+ // are not able to handle this.
+ }
+ break;
+ case REMOVE:
+ try {
+ removeApplication(event.getAppContext().getApplicationID());
+ } catch(IOException ie) {
+ LOG.error("Error in removing application", ie);
+ //TODO have to be shutdown the RM in case of this.
+ // do a graceful shutdown.
+ }
+ break;
+ }
+ }
+
+ private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
+ private Resource clusterResource = new Resource();
+
+
+ public synchronized Resource getClusterResource() {
+ return clusterResource;
+ }
+
+ @Override
+ public synchronized void removeNode(NodeInfo nodeInfo) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+ clusterResource, nodeInfo.getTotalCapability());
+ nodes.remove(nodeInfo.getHostName());
+ }
+
+ public synchronized boolean isTracked(NodeInfo nodeInfo) {
+ NodeManager node = nodes.get(nodeInfo.getHostName());
+ return (node == null? false: true);
+ }
+
+ @Override
+ public synchronized NodeInfo addNode(NodeID nodeId,
+ String hostName, Node node, Resource capability) {
+ NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
+ nodes.put(nodeManager.getHostName(), nodeManager);
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ clusterResource, nodeManager.getTotalCapability());
+ return nodeManager;
+ }
+
+ public synchronized boolean releaseContainer(ApplicationID applicationId,
+ Container container) {
+ // Reap containers
+ LOG.info("Application " + applicationId + " released container " + container);
+ NodeManager nodeManager = nodes.get(container.hostName.toString());
+ return nodeManager.releaseContainer(container);
+ }
+
+ public synchronized NodeResponse nodeUpdateInternal(NodeInfo nodeInfo,
+ Map<CharSequence,List<Container>> containers) {
+ NodeManager node = nodes.get(nodeInfo.getHostName());
+ LOG.debug("nodeUpdate: node=" + nodeInfo.getHostName() +
+ " available=" + nodeInfo.getAvailableResource().memory);
+ return node.statusUpdate(containers);
+
+ }
+
+ public synchronized void addAllocatedContainers(NodeInfo nodeInfo,
+ ApplicationID applicationId, List<Container> containers) {
+ NodeManager node = nodes.get(nodeInfo.getHostName());
+ node.allocateContainer(applicationId, containers);
+ }
+
+ public synchronized void finishedApplication(ApplicationID applicationId,
+ List<NodeInfo> nodesToNotify) {
+ for (NodeInfo node: nodesToNotify) {
+ NodeManager nodeManager = nodes.get(node.getHostName());
+ nodeManager.notifyFinishedApplication(applicationId);
+ }
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Mar 24 07:52:35 2011
@@ -33,18 +33,17 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceRequest;
@Private
@Unstable
@@ -277,7 +276,7 @@ public class LeafQueue implements Queue
@Override
public synchronized Resource
- assignContainers(ClusterTracker cluster, NodeInfo node) {
+ assignContainers(Resource clusterResource, NodeManager node) {
LOG.info("DEBUG --- assignContainers:" +
" node=" + node.getHostName() +
@@ -303,19 +302,19 @@ public class LeafQueue implements Queue
if (required != null && required.numContainers > 0) {
// Maximum Capacity of the queue
- if (!assignToQueue(cluster, required.capability)) {
+ if (!assignToQueue(clusterResource, required.capability)) {
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
// User limits
- if (!assignToUser(application.getUser(), cluster, required.capability)) {
+ if (!assignToUser(application.getUser(), clusterResource, required.capability)) {
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
}
Resource assigned =
- assignContainersOnNode(cluster, node, application, priority);
+ assignContainersOnNode(clusterResource, node, application, priority);
if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
assigned,
@@ -324,7 +323,7 @@ public class LeafQueue implements Queue
application.getResourceRequest(priority, NodeManager.ANY).capability;
// Book-keeping
- allocateResource(cluster.getClusterResource(),
+ allocateResource(clusterResource,
application.getUser(), assignedResource);
// Done
@@ -343,11 +342,11 @@ public class LeafQueue implements Queue
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
- private synchronized boolean assignToQueue(ClusterTracker cluster,
+ private synchronized boolean assignToQueue(Resource clusterResource,
Resource required) {
float newUtilization =
(float)(usedResources.memory + required.memory) /
- (cluster.getClusterResource().memory * absoluteCapacity);
+ (clusterResource.memory * absoluteCapacity);
if (newUtilization > absoluteMaxCapacity) {
LOG.info(getQueueName() +
" current-capacity (" + getUtilization() + ") +" +
@@ -358,7 +357,7 @@ public class LeafQueue implements Queue
return true;
}
- private synchronized boolean assignToUser(String userName, ClusterTracker cluster,
+ private synchronized boolean assignToUser(String userName, Resource clusterResource,
Resource required) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
@@ -370,7 +369,7 @@ public class LeafQueue implements Queue
// Allow progress for queues with miniscule capacity
final int queueCapacity =
Math.max(
- divideAndCeil((int)(absoluteCapacity * cluster.getClusterResource().memory),
+ divideAndCeil((int)(absoluteCapacity * clusterResource.memory),
minimumAllocation.memory),
required.memory);
@@ -425,14 +424,14 @@ public class LeafQueue implements Queue
return (offSwitchRequest.numContainers > 0);
}
- Resource assignContainersOnNode(ClusterTracker cluster, NodeInfo node,
+ Resource assignContainersOnNode(Resource clusterResource, NodeManager node,
Application application, Priority priority) {
Resource assigned =
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
// Data-local
- assigned = assignNodeLocalContainers(cluster, node, application, priority);
+ assigned = assignNodeLocalContainers(clusterResource, node, application, priority);
if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
assigned,
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
@@ -440,7 +439,7 @@ public class LeafQueue implements Queue
}
// Rack-local
- assigned = assignRackLocalContainers(cluster, node, application, priority);
+ assigned = assignRackLocalContainers(clusterResource, node, application, priority);
if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
assigned,
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
@@ -448,16 +447,16 @@ public class LeafQueue implements Queue
}
// Off-switch
- return assignOffSwitchContainers(cluster, node, application, priority);
+ return assignOffSwitchContainers(clusterResource, node, application, priority);
}
- Resource assignNodeLocalContainers(ClusterTracker cluster, NodeInfo node,
+ Resource assignNodeLocalContainers(Resource clusterResource, NodeManager node,
Application application, Priority priority) {
ResourceRequest request =
application.getResourceRequest(priority, node.getHostName());
if (request != null) {
if (canAssign(application, priority, node, NodeType.DATA_LOCAL)) {
- return assignContainer(cluster, node, application, priority, request,
+ return assignContainer(clusterResource, node, application, priority, request,
NodeType.DATA_LOCAL);
}
}
@@ -465,13 +464,13 @@ public class LeafQueue implements Queue
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
- Resource assignRackLocalContainers(ClusterTracker cluster, NodeInfo node,
+ Resource assignRackLocalContainers(Resource clusterResource, NodeManager node,
Application application, Priority priority) {
ResourceRequest request =
application.getResourceRequest(priority, node.getRackName());
if (request != null) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL)) {
- return assignContainer(cluster, node, application, priority, request,
+ return assignContainer(clusterResource, node, application, priority, request,
NodeType.RACK_LOCAL);
}
}
@@ -479,13 +478,13 @@ public class LeafQueue implements Queue
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
- Resource assignOffSwitchContainers(ClusterTracker cluster, NodeInfo node,
+ Resource assignOffSwitchContainers(Resource clusterResource, NodeManager node,
Application application, Priority priority) {
ResourceRequest request =
application.getResourceRequest(priority, NodeManager.ANY);
if (request != null) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH)) {
- return assignContainer(cluster, node, application, priority, request,
+ return assignContainer(clusterResource, node, application, priority, request,
NodeType.OFF_SWITCH);
}
}
@@ -529,7 +528,7 @@ public class LeafQueue implements Queue
return false;
}
- private Resource assignContainer(ClusterTracker cluster, NodeInfo node,
+ private Resource assignContainer(Resource clusterResource, NodeManager node,
Application application,
Priority priority, ResourceRequest request, NodeType type) {
LOG.info("DEBUG --- assignContainers:" +
@@ -577,16 +576,14 @@ public class LeafQueue implements Queue
// Allocate container to the application
application.allocate(type, node, priority, request, containers);
- // Update resource usage on the node
- cluster.addAllocatedContainers(node, application.getApplicationId(),
- containers);
+ node.allocateContainer(application.getApplicationId(), containers);
LOG.info("allocatedContainer" +
" container=" + container +
" queue=" + this.toString() +
" util=" + getUtilization() +
" used=" + usedResources +
- " cluster=" + cluster.getClusterResource());
+ " cluster=" + clusterResource);
return container.resource;
}
@@ -595,7 +592,7 @@ public class LeafQueue implements Queue
}
@Override
- public void completedContainer(ClusterTracker cluster,
+ public void completedContainer(Resource clusterResource,
Container container, Application application) {
if (application != null) {
// Careful! Locking order is important!
@@ -604,7 +601,7 @@ public class LeafQueue implements Queue
application.completedContainer(container);
// Book-keeping
- releaseResource(cluster.getClusterResource(),
+ releaseResource(clusterResource,
application.getUser(), container.resource);
LOG.info("completedContainer" +
@@ -612,11 +609,11 @@ public class LeafQueue implements Queue
" queue=" + this +
" util=" + getUtilization() +
" used=" + usedResources +
- " cluster=" + cluster.getClusterResource());
+ " cluster=" + clusterResource);
}
// Inform the parent queue
- parent.completedContainer(cluster, container, application);
+ parent.completedContainer(clusterResource, container, application);
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Mar 24 07:52:35 2011
@@ -31,12 +31,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
import org.apache.hadoop.yarn.Container;
import org.apache.hadoop.yarn.Priority;
import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
@Private
@Evolving
@@ -251,8 +251,8 @@ public class ParentQueue implements Queu
}
@Override
- public synchronized Resource assignContainers(ClusterTracker cluster,
- NodeInfo node) {
+ public synchronized Resource assignContainers(
+ Resource clusterResource, NodeManager node) {
Resource assigned =
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
@@ -269,14 +269,14 @@ public class ParentQueue implements Queu
}
// Schedule
- Resource assignedToChild = assignContainersToChildQueues(cluster, node);
+ Resource assignedToChild = assignContainersToChildQueues(clusterResource, node);
// Done if no child-queue assigned anything
if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
assignedToChild,
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
// Track resource utilization for the parent-queue
- allocateResource(cluster.getClusterResource(), assignedToChild);
+ allocateResource(clusterResource, assignedToChild);
// Track resource utilization in this pass of the scheduler
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
@@ -286,7 +286,7 @@ public class ParentQueue implements Queu
" queue=" + getQueueName() +
" util=" + getUtilization() +
" used=" + usedResources +
- " cluster=" + cluster.getClusterResource());
+ " cluster=" + clusterResource);
} else {
break;
@@ -317,8 +317,8 @@ public class ParentQueue implements Queu
minimumAllocation);
}
- synchronized Resource assignContainersToChildQueues(ClusterTracker cluster,
- NodeInfo node) {
+ synchronized Resource assignContainersToChildQueues(Resource cluster,
+ NodeManager node) {
Resource assigned =
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
@@ -362,24 +362,24 @@ public class ParentQueue implements Queu
}
@Override
- public void completedContainer(ClusterTracker cluster,
+ public void completedContainer(Resource clusterResource,
Container container, Application application) {
if (application != null) {
// Careful! Locking order is important!
// Book keeping
synchronized (this) {
- releaseResource(cluster.getClusterResource(), container.resource);
+ releaseResource(clusterResource, container.resource);
LOG.info("completedContainer" +
" queue=" + getQueueName() +
" util=" + getUtilization() +
" used=" + usedResources +
- " cluster=" + cluster.getClusterResource());
+ " cluster=" + clusterResource);
}
// Inform the parent
if (parent != null) {
- parent.completedContainer(cluster, container, application);
+ parent.completedContainer(clusterResource, container, application);
}
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Thu Mar 24 07:52:35 2011
@@ -23,12 +23,12 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
import org.apache.hadoop.yarn.Container;
import org.apache.hadoop.yarn.Priority;
import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
/**
* Queue represents a node in the tree of
@@ -141,19 +141,19 @@ extends org.apache.hadoop.yarn.server.re
/**
* Assign containers to applications in the queue or it's children (if any).
- * @param cluster cluster resources
+ * @param clusterResource the resource of the cluster.
* @param node node on which resources are available
* @return
*/
- public Resource assignContainers(ClusterTracker cluster, NodeInfo node);
+ public Resource assignContainers(Resource clusterResource, NodeManager node);
/**
* A container assigned to the queue has completed.
- * @param cluster cluster resources
+ * @param clusterResource the resource of the cluster
* @param container completed container
* @param application application to which the container was assigned
*/
- public void completedContainer(ClusterTracker cluster,
+ public void completedContainer(Resource clusterResource,
Container container, Application application);
/**