You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by su...@apache.org on 2013/01/11 20:40:30 UTC
svn commit: r1432246 [2/3] - in
/hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop...
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Fri Jan 11 19:40:23 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,10 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -48,7 +53,8 @@ import org.apache.hadoop.yarn.server.sec
/**
* This class manages the list of applications for the resource manager.
*/
-public class RMAppManager implements EventHandler<RMAppManagerEvent> {
+public class RMAppManager implements EventHandler<RMAppManagerEvent>,
+ Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
@@ -173,6 +179,10 @@ public class RMAppManager implements Eve
completedApps.add(applicationId);
writeAuditLog(applicationId);
+
+ // application completely done. Remove from state
+ RMStateStore store = rmContext.getStateStore();
+ store.removeApplication(rmContext.getRMApps().get(applicationId));
}
}
@@ -306,6 +316,37 @@ public class RMAppManager implements Eve
}
return credentials;
}
+
+ @Override
+ public void recover(RMState state) throws Exception {
+ RMStateStore store = rmContext.getStateStore();
+ assert store != null;
+ // recover applications
+ Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
+ LOG.info("Recovering " + appStates.size() + " applications");
+ for(ApplicationState appState : appStates.values()) {
+ // re-submit the application
+ // this is going to send an app start event but since the async dispatcher
+ // has not started that event will be queued until we have completed re
+ // populating the state
+ if(appState.getApplicationSubmissionContext().getUnmanagedAM()) {
+ // do not recover unmanaged applications since current recovery
+ // mechanism of restarting attempts does not work for them.
+ // This will need to be changed in work preserving recovery in which
+ // RM will re-connect with the running AM's instead of restarting them
+ LOG.info("Not recovering unmanaged application " + appState.getAppId());
+ store.removeApplication(appState);
+ } else {
+ LOG.info("Recovering application " + appState.getAppId());
+ submitApplication(appState.getApplicationSubmissionContext(),
+ appState.getSubmitTime());
+ // re-populate attempt information in application
+ RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
+ appState.getAppId());
+ appImpl.recover(state);
+ }
+ }
+ }
@Override
public void handle(RMAppManagerEvent event) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Fri Jan 11 19:40:23 2013
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMa
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.res
public interface RMContext {
Dispatcher getDispatcher();
+
+ RMStateStore getStateStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps();
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Fri Jan 11 19:40:23 2013
@@ -23,7 +23,10 @@ import java.util.concurrent.ConcurrentMa
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import com.google.common.annotations.VisibleForTesting;
+
public class RMContextImpl implements RMContext {
private final Dispatcher rmDispatcher;
@@ -48,6 +53,7 @@ public class RMContextImpl implements RM
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
+ private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private final DelegationTokenRenewer tokenRenewer;
private final ApplicationTokenSecretManager appTokenSecretManager;
@@ -55,6 +61,7 @@ public class RMContextImpl implements RM
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
public RMContextImpl(Dispatcher rmDispatcher,
+ RMStateStore store,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
@@ -63,6 +70,7 @@ public class RMContextImpl implements RM
RMContainerTokenSecretManager containerTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
this.rmDispatcher = rmDispatcher;
+ this.stateStore = store;
this.containerAllocationExpirer = containerAllocationExpirer;
this.amLivelinessMonitor = amLivelinessMonitor;
this.amFinishingMonitor = amFinishingMonitor;
@@ -71,11 +79,39 @@ public class RMContextImpl implements RM
this.containerTokenSecretManager = containerTokenSecretManager;
this.clientToAMTokenSecretManager = clientTokenSecretManager;
}
+
+ @VisibleForTesting
+ // helper constructor for tests
+ public RMContextImpl(Dispatcher rmDispatcher,
+ ContainerAllocationExpirer containerAllocationExpirer,
+ AMLivelinessMonitor amLivelinessMonitor,
+ AMLivelinessMonitor amFinishingMonitor,
+ DelegationTokenRenewer tokenRenewer,
+ ApplicationTokenSecretManager appTokenSecretManager,
+ RMContainerTokenSecretManager containerTokenSecretManager,
+ ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
+ this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
+ amFinishingMonitor, tokenRenewer, appTokenSecretManager,
+ containerTokenSecretManager, clientTokenSecretManager);
+ RMStateStore nullStore = new NullRMStateStore();
+ nullStore.setDispatcher(rmDispatcher);
+ try {
+ nullStore.init(new YarnConfiguration());
+ setStateStore(nullStore);
+ } catch (Exception e) {
+ assert false;
+ }
+ }
@Override
public Dispatcher getDispatcher() {
return this.rmDispatcher;
}
+
+ @Override
+ public RMStateStore getStateStore() {
+ return stateStore;
+ }
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
@@ -126,4 +162,9 @@ public class RMContextImpl implements RM
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
}
+
+ @VisibleForTesting
+ public void setStateStore(RMStateStore store) {
+ stateStore = store;
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Fri Jan 11 19:40:23 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
@@ -45,10 +46,11 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -80,6 +82,8 @@ import org.apache.hadoop.yarn.webapp.Web
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* The ResourceManager is the main class that is a set of components.
* "I am the ResourceManager. All your resources are belong to us..."
@@ -119,14 +123,13 @@ public class ResourceManager extends Com
protected RMDelegationTokenSecretManager rmDTSecretManager;
private WebApp webApp;
protected RMContext rmContext;
- private final RMStateStore store;
protected ResourceTrackerService resourceTracker;
+ private boolean recoveryEnabled;
private Configuration conf;
-
- public ResourceManager(RMStateStore store) {
+
+ public ResourceManager() {
super("ResourceManager");
- this.store = store;
}
public RMContext getRMContext() {
@@ -160,12 +163,34 @@ public class ResourceManager extends Com
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
+ boolean isRecoveryEnabled = conf.getBoolean(
+ YarnConfiguration.RECOVERY_ENABLED,
+ YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+
+ RMStateStore rmStore = null;
+ if(isRecoveryEnabled) {
+ recoveryEnabled = true;
+ rmStore = RMStateStoreFactory.getStore(conf);
+ } else {
+ recoveryEnabled = false;
+ rmStore = new NullRMStateStore();
+ }
+ try {
+ rmStore.init(conf);
+ rmStore.setDispatcher(rmDispatcher);
+ } catch (Exception e) {
+ // the Exception from stateStore.init() needs to be handled for
+ // HA and we need to give up master status if we got fenced
+ LOG.error("Failed to init state store", e);
+ ExitUtil.terminate(1, e);
+ }
+
this.rmContext =
- new RMContextImpl(this.rmDispatcher,
+ new RMContextImpl(this.rmDispatcher, rmStore,
this.containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, tokenRenewer, this.appTokenSecretManager,
this.containerTokenSecretManager, this.clientToAMSecretManager);
-
+
// Register event handler for NodesListManager
this.nodesListManager = new NodesListManager(this.rmContext);
this.rmDispatcher.register(NodesListManagerEventType.class,
@@ -226,9 +251,15 @@ public class ResourceManager extends Com
addService(applicationMasterLauncher);
new RMNMInfo(this.rmContext, this.scheduler);
-
+
super.init(conf);
}
+
+ @VisibleForTesting
+ protected void setRMStateStore(RMStateStore rmStore) {
+ rmStore.setDispatcher(rmDispatcher);
+ ((RMContextImpl) rmContext).setStateStore(rmStore);
+ }
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
@@ -502,6 +533,19 @@ public class ResourceManager extends Com
this.appTokenSecretManager.start();
this.containerTokenSecretManager.start();
+ if(recoveryEnabled) {
+ try {
+ RMStateStore rmStore = rmContext.getStateStore();
+ RMState state = rmStore.loadState();
+ recover(state);
+ } catch (Exception e) {
+ // the Exception from loadState() needs to be handled for
+ // HA and we need to give up master status if we got fenced
+ LOG.error("Failed to load/recover state", e);
+ ExitUtil.terminate(1, e);
+ }
+ }
+
startWepApp();
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
@@ -555,6 +599,13 @@ public class ResourceManager extends Com
DefaultMetricsSystem.shutdown();
+ RMStateStore store = rmContext.getStateStore();
+ try {
+ store.close();
+ } catch (Exception e) {
+ LOG.error("Error closing store.", e);
+ }
+
super.stop();
}
@@ -643,6 +694,8 @@ public class ResourceManager extends Com
@Override
public void recover(RMState state) throws Exception {
+ // recover applications
+ rmAppManager.recover(state);
}
public static void main(String argv[]) {
@@ -650,13 +703,11 @@ public class ResourceManager extends Com
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
- RMStateStore store = StoreFactory.getStore(conf);
- ResourceManager resourceManager = new ResourceManager(store);
+ ResourceManager resourceManager = new ResourceManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf);
- //resourceManager.recover(store.restore());
resourceManager.start();
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Fri Jan 11 19:40:23 2013
@@ -15,10 +15,313 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-public interface RMStateStore {
- public interface RMState {
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+
+@Private
+@Unstable
+/**
+ * Base class to implement storage of ResourceManager state.
+ * Takes care of asynchronous notifications and interfacing with YARN objects.
+ * Real store implementations need to derive from it and implement blocking
+ * store and load methods to actually store and load the state.
+ */
+public abstract class RMStateStore {
+ public static final Log LOG = LogFactory.getLog(RMStateStore.class);
+
+ /**
+ * State of an application attempt
+ */
+ public static class ApplicationAttemptState {
+ final ApplicationAttemptId attemptId;
+ final Container masterContainer;
+
+ public ApplicationAttemptState(ApplicationAttemptId attemptId,
+ Container masterContainer) {
+ this.attemptId = attemptId;
+ this.masterContainer = masterContainer;
+ }
+
+ public Container getMasterContainer() {
+ return masterContainer;
+ }
+ public ApplicationAttemptId getAttemptId() {
+ return attemptId;
+ }
+ }
+
+ /**
+ * State of an application application
+ */
+ public static class ApplicationState {
+ final ApplicationSubmissionContext context;
+ final long submitTime;
+ Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
+ new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
+
+ ApplicationState(long submitTime, ApplicationSubmissionContext context) {
+ this.submitTime = submitTime;
+ this.context = context;
+ }
+
+ public ApplicationId getAppId() {
+ return context.getApplicationId();
+ }
+ public long getSubmitTime() {
+ return submitTime;
+ }
+ public int getAttemptCount() {
+ return attempts.size();
+ }
+ public ApplicationSubmissionContext getApplicationSubmissionContext() {
+ return context;
+ }
+ public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) {
+ return attempts.get(attemptId);
+ }
+ }
+
+ /**
+ * State of the ResourceManager
+ */
+ public static class RMState {
+ Map<ApplicationId, ApplicationState> appState =
+ new HashMap<ApplicationId, ApplicationState>();
+
+ public Map<ApplicationId, ApplicationState> getApplicationState() {
+ return appState;
+ }
+ }
+
+ private Dispatcher rmDispatcher;
+
+ /**
+ * Dispatcher used to send state operation completion events to
+ * ResourceManager services
+ */
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.rmDispatcher = dispatcher;
+ }
+
+ AsyncDispatcher dispatcher;
+
+ public synchronized void init(Configuration conf) throws Exception{
+ // create async handler
+ dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.register(RMStateStoreEventType.class,
+ new ForwardingEventHandler());
+ dispatcher.start();
+
+ initInternal(conf);
+ }
+
+ /**
+ * Derived classes initialize themselves using this method.
+ * The base class is initialized and the event dispatcher is ready to use at
+ * this point
+ */
+ protected abstract void initInternal(Configuration conf) throws Exception;
+
+ public synchronized void close() throws Exception {
+ closeInternal();
+ dispatcher.stop();
+ }
+
+ /**
+ * Derived classes close themselves using this method.
+ * The base class will be closed and the event dispatcher will be shutdown
+ * after this
+ */
+ protected abstract void closeInternal() throws Exception;
+
+ /**
+ * Blocking API
+ * The derived class must recover state from the store and return a new
+ * RMState object populated with that state
+ * This must not be called on the dispatcher thread
+ */
+ public abstract RMState loadState() throws Exception;
+
+ /**
+ * Blocking API
+ * ResourceManager services use this to store the application's state
+ * This must not be called on the dispatcher thread
+ */
+ public synchronized void storeApplication(RMApp app) throws Exception {
+ ApplicationSubmissionContext context = app
+ .getApplicationSubmissionContext();
+ assert context instanceof ApplicationSubmissionContextPBImpl;
+
+ ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl();
+ appStateData.setSubmitTime(app.getSubmitTime());
+ appStateData.setApplicationSubmissionContext(context);
+
+ LOG.info("Storing info for app: " + context.getApplicationId());
+ storeApplicationState(app.getApplicationId().toString(), appStateData);
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to store the state of an
+ * application.
+ */
+ protected abstract void storeApplicationState(String appId,
+ ApplicationStateDataPBImpl appStateData)
+ throws Exception;
+
+ @SuppressWarnings("unchecked")
+ /**
+ * Non-blocking API
+ * ResourceManager services call this to store state on an application attempt
+ * This does not block the dispatcher threads
+ * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
+ */
+ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
+ ApplicationAttemptState attemptState = new ApplicationAttemptState(
+ appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreAppAttemptEvent(attemptState));
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to store the state of an
+ * application attempt
+ */
+ protected abstract void storeApplicationAttemptState(String attemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateData)
+ throws Exception;
+
+
+ /**
+ * Non-blocking API
+ * ResourceManager services call this to remove an application from the state
+ * store
+ * This does not block the dispatcher threads
+ * There is no notification of completion for this operation.
+ */
+ public synchronized void removeApplication(RMApp app) {
+ ApplicationState appState = new ApplicationState(
+ app.getSubmitTime(), app.getApplicationSubmissionContext());
+ for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
+ ApplicationAttemptState attemptState = new ApplicationAttemptState(
+ appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
+ }
+
+ removeApplication(appState);
+ }
+
+ @SuppressWarnings("unchecked")
+ /**
+ * Non-Blocking API
+ */
+ public synchronized void removeApplication(ApplicationState appState) {
+ dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to remove the state of an
+ * application and its attempts
+ */
+ protected abstract void removeApplicationState(ApplicationState appState)
+ throws Exception;
+
+ // Dispatcher related code
+
+ private synchronized void handleStoreEvent(RMStateStoreEvent event) {
+ switch(event.getType()) {
+ case STORE_APP_ATTEMPT:
+ {
+ ApplicationAttemptState attemptState =
+ ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+ Exception storedException = null;
+ ApplicationAttemptStateDataPBImpl attemptStateData =
+ new ApplicationAttemptStateDataPBImpl();
+ attemptStateData.setAttemptId(attemptState.getAttemptId());
+ attemptStateData.setMasterContainer(attemptState.getMasterContainer());
+
+ LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
+ try {
+ storeApplicationAttemptState(attemptState.getAttemptId().toString(),
+ attemptStateData);
+ } catch (Exception e) {
+ LOG.error("Error storing appAttempt: "
+ + attemptState.getAttemptId(), e);
+ storedException = e;
+ } finally {
+ notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
+ storedException);
+ }
+ }
+ break;
+ case REMOVE_APP:
+ {
+ ApplicationState appState =
+ ((RMStateStoreRemoveAppEvent) event).getAppState();
+ ApplicationId appId = appState.getAppId();
+
+ LOG.info("Removing info for app: " + appId);
+ try {
+ removeApplicationState(appState);
+ } catch (Exception e) {
+ LOG.error("Error removing app: " + appId, e);
+ }
+ }
+ break;
+ default:
+ LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ /**
+ * In (@link storeApplicationAttempt}, derived class can call this method to
+ * notify the application attempt about operation completion
+ * @param appAttempt attempt that has been saved
+ */
+ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
+ Exception storedException) {
+ rmDispatcher.getEventHandler().handle(
+ new RMAppAttemptStoredEvent(attemptId, storedException));
+ }
+
+ /**
+ * EventHandler implementation which forward events to the FSRMStateStore
+ * This hides the EventHandle methods of the store from its public interface
+ */
+ private final class ForwardingEventHandler
+ implements EventHandler<RMStateStoreEvent> {
+
+ @Override
+ public void handle(RMStateStoreEvent event) {
+ handleStoreEvent(event);
+ }
}
+
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java Fri Jan 11 19:40:23 2013
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Fri Jan 11 19:40:23 2013
@@ -44,6 +44,12 @@ public interface RMApp extends EventHand
* @return the {@link ApplicationId} for this {@link RMApp}.
*/
ApplicationId getApplicationId();
+
+ /**
+ * The application submission context for this {@link RMApp}
+ * @return the {@link ApplicationSubmissionContext} for this {@link RMApp}
+ */
+ ApplicationSubmissionContext getApplicationSubmissionContext();
/**
* The current state of the {@link RMApp}.
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Jan 11 19:40:23 2013
@@ -50,6 +50,9 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -66,7 +69,7 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
-public class RMAppImpl implements RMApp {
+public class RMAppImpl implements RMApp, Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
private static final String UNAVAILABLE = "N/A";
@@ -243,6 +246,11 @@ public class RMAppImpl implements RMApp
public ApplicationId getApplicationId() {
return this.applicationId;
}
+
+ @Override
+ public ApplicationSubmissionContext getApplicationSubmissionContext() {
+ return this.submissionContext;
+ }
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
@@ -512,9 +520,22 @@ public class RMAppImpl implements RMApp
this.writeLock.unlock();
}
}
+
+ @Override
+ public void recover(RMState state) {
+ ApplicationState appState = state.getApplicationState().get(getApplicationId());
+ LOG.info("Recovering app: " + getApplicationId() + " with " +
+ + appState.getAttemptCount() + " attempts");
+ for(int i=0; i<appState.getAttemptCount(); ++i) {
+ // create attempt
+ createNewAttempt(false);
+ // recover attempt
+ ((RMAppAttemptImpl) currentAttempt).recover(state);
+ }
+ }
@SuppressWarnings("unchecked")
- private void createNewAttempt() {
+ private void createNewAttempt(boolean startAttempt) {
ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(applicationId);
@@ -525,8 +546,10 @@ public class RMAppImpl implements RMApp
submissionContext, conf);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
- handler.handle(
- new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+ if(startAttempt) {
+ handler.handle(
+ new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+ }
}
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
@@ -553,7 +576,7 @@ public class RMAppImpl implements RMApp
private static final class StartAppAttemptTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
- app.createNewAttempt();
+ app.createNewAttempt(true);
};
}
@@ -647,7 +670,7 @@ public class RMAppImpl implements RMApp
msg = "Unmanaged application " + app.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
- } else if (app.attempts.size() == app.maxRetries) {
+ } else if (app.attempts.size() >= app.maxRetries) {
retryApp = false;
msg = "Application " + app.getApplicationId() + " failed "
+ app.maxRetries + " times due to " + failedEvent.getDiagnostics()
@@ -655,7 +678,7 @@ public class RMAppImpl implements RMApp
}
if (retryApp) {
- app.createNewAttempt();
+ app.createNewAttempt(true);
return initialState;
} else {
LOG.info(msg);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Fri Jan 11 19:40:23 2013
@@ -39,9 +39,15 @@ public enum RMAppAttemptEventType {
CONTAINER_ACQUIRED,
CONTAINER_ALLOCATED,
CONTAINER_FINISHED,
+
+ // Source: RMStateStore
+ ATTEMPT_SAVED,
// Source: Scheduler
APP_REJECTED,
APP_ACCEPTED,
+
+ // Source: RMAttemptImpl.recover
+ RECOVER
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Jan 11 19:40:23 2013
@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -57,6 +58,11 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -69,6 +75,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -85,7 +92,7 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.util.BuilderUtils;
@SuppressWarnings({"unchecked", "rawtypes"})
-public class RMAppAttemptImpl implements RMAppAttempt {
+public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
@@ -153,12 +160,15 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
RMAppAttemptEventType.REGISTERED,
new UnexpectedAMRegisteredTransition())
+ .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED,
+ RMAppAttemptEventType.RECOVER)
// Transitions from SUBMITTED state
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
.addTransition(RMAppAttemptState.SUBMITTED,
- EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED),
+ EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+ RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.APP_ACCEPTED,
new ScheduleTransition())
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
@@ -170,12 +180,42 @@ public class RMAppAttemptImpl implements
// Transitions from SCHEDULED State
.addTransition(RMAppAttemptState.SCHEDULED,
- RMAppAttemptState.ALLOCATED,
+ RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
new AMContainerAllocatedTransition())
.addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
RMAppAttemptEventType.KILL,
new BaseFinalTransition(RMAppAttemptState.KILLED))
+
+ // Transitions from ALLOCATED_SAVING State
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
+ RMAppAttemptState.ALLOCATED,
+ RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
+ RMAppAttemptState.ALLOCATED_SAVING,
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
+ new ContainerAcquiredTransition())
+ // App could be killed by the client. So need to handle this.
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
+ RMAppAttemptState.KILLED,
+ RMAppAttemptEventType.KILL,
+ new BaseFinalTransition(RMAppAttemptState.KILLED))
+
+ // Transitions from LAUNCHED_UNMANAGED_SAVING State
+ .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+ RMAppAttemptState.LAUNCHED,
+ RMAppAttemptEventType.ATTEMPT_SAVED,
+ new UnmanagedAMAttemptSavedTransition())
+ // attempt should not try to register in this state
+ .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+ RMAppAttemptState.FAILED,
+ RMAppAttemptEventType.REGISTERED,
+ new UnexpectedAMRegisteredTransition())
+ // App could be killed by the client. So need to handle this.
+ .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+ RMAppAttemptState.KILLED,
+ RMAppAttemptEventType.KILL,
+ new BaseFinalTransition(RMAppAttemptState.KILLED))
// Transitions from ALLOCATED State
.addTransition(RMAppAttemptState.ALLOCATED,
@@ -279,11 +319,30 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
+ RMAppAttemptEventType.ATTEMPT_SAVED,
+ RMAppAttemptEventType.CONTAINER_FINISHED,
+ RMAppAttemptEventType.UNREGISTERED,
+ RMAppAttemptEventType.KILL,
+ RMAppAttemptEventType.STATUS_UPDATE))
+
+ // Transitions from RECOVERED State
+ .addTransition(
+ RMAppAttemptState.RECOVERED,
+ RMAppAttemptState.RECOVERED,
+ EnumSet.of(RMAppAttemptEventType.START,
+ RMAppAttemptEventType.APP_ACCEPTED,
+ RMAppAttemptEventType.APP_REJECTED,
+ RMAppAttemptEventType.EXPIRE,
+ RMAppAttemptEventType.LAUNCHED,
+ RMAppAttemptEventType.LAUNCH_FAILED,
+ RMAppAttemptEventType.REGISTERED,
+ RMAppAttemptEventType.CONTAINER_ALLOCATED,
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
+ RMAppAttemptEventType.ATTEMPT_SAVED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
-
.installTopology();
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@@ -318,7 +377,7 @@ public class RMAppAttemptImpl implements
@Override
public ApplicationSubmissionContext getSubmissionContext() {
return this.submissionContext;
- }
+ }
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
@@ -494,6 +553,10 @@ public class RMAppAttemptImpl implements
}
}
+ private void setMasterContainer(Container container) {
+ masterContainer = container;
+ }
+
@Override
public void handle(RMAppAttemptEvent event) {
@@ -561,6 +624,21 @@ public class RMAppAttemptImpl implements
}
}
+ @Override
+ public void recover(RMState state) {
+ ApplicationState appState =
+ state.getApplicationState().get(getAppAttemptId().getApplicationId());
+ ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
+ assert attemptState != null;
+ setMasterContainer(attemptState.getMasterContainer());
+ LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
+ + " AttemptId: " + getAppAttemptId()
+ + " MasterContainer: " + masterContainer);
+ setDiagnostics("Attempt recovered after RM restart");
+ handle(new RMAppAttemptEvent(getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
+ }
+
private static class BaseTransition implements
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
@@ -625,13 +703,12 @@ public class RMAppAttemptImpl implements
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
-
- // Send the acceptance to the app
- appAttempt.eventHandler.handle(new RMAppEvent(event
- .getApplicationAttemptId().getApplicationId(),
- RMAppEventType.APP_ACCEPTED));
-
if (!appAttempt.submissionContext.getUnmanagedAM()) {
+ // Send the acceptance to the app
+ appAttempt.eventHandler.handle(new RMAppEvent(event
+ .getApplicationAttemptId().getApplicationId(),
+ RMAppEventType.APP_ACCEPTED));
+
// Request a container for the AM.
ResourceRequest request = BuilderUtils.newResourceRequest(
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
@@ -647,35 +724,42 @@ public class RMAppAttemptImpl implements
return RMAppAttemptState.SCHEDULED;
} else {
// RM not allocating container. AM is self launched.
- // Directly go to LAUNCHED state
- // Register with AMLivelinessMonitor
- appAttempt.rmContext.getAMLivelinessMonitor().register(
- appAttempt.applicationAttemptId);
- return RMAppAttemptState.LAUNCHED;
+ RMStateStore store = appAttempt.rmContext.getStateStore();
+ // save state and then go to LAUNCHED state
+ appAttempt.storeAttempt(store);
+ return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
}
}
}
- private static final class AMContainerAllocatedTransition extends BaseTransition {
+ private static final class AMContainerAllocatedTransition
+ extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
- RMAppAttemptEvent event) {
-
+ RMAppAttemptEvent event) {
// Acquire the AM container from the scheduler.
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
EMPTY_CONTAINER_RELEASE_LIST);
// Set the masterContainer
- appAttempt.masterContainer = amContainerAllocation.getContainers().get(
- 0);
+ appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
+ 0));
- // Send event to launch the AM Container
- appAttempt.eventHandler.handle(new AMLauncherEvent(
- AMLauncherEventType.LAUNCH, appAttempt));
+ RMStateStore store = appAttempt.rmContext.getStateStore();
+ appAttempt.storeAttempt(store);
}
}
-
+
+ private static final class AttemptStoredTransition extends BaseTransition {
+ @Override
+ public void transition(RMAppAttemptImpl appAttempt,
+ RMAppAttemptEvent event) {
+ appAttempt.checkAttemptStoreError(event);
+ appAttempt.launchAttempt();
+ }
+ }
+
private static class BaseFinalTransition extends BaseTransition {
private final RMAppAttemptState finalAttemptState;
@@ -736,17 +820,34 @@ public class RMAppAttemptImpl implements
}
}
- private static final class AMLaunchedTransition extends BaseTransition {
+ private static class AMLaunchedTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
- RMAppAttemptEvent event) {
-
+ RMAppAttemptEvent event) {
// Register with AMLivelinessMonitor
- appAttempt.rmContext.getAMLivelinessMonitor().register(
- appAttempt.applicationAttemptId);
-
+ appAttempt.attemptLaunched();
}
}
+
+ private static final class UnmanagedAMAttemptSavedTransition
+ extends AMLaunchedTransition {
+ @Override
+ public void transition(RMAppAttemptImpl appAttempt,
+ RMAppAttemptEvent event) {
+ appAttempt.checkAttemptStoreError(event);
+ // Send the acceptance to the app
+ // Ideally this should have been done when the scheduler accepted the app.
+ // But its here because until the attempt is saved the client should not
+ // launch the unmanaged AM. Client waits for the app status to be accepted
+ // before doing so. So we have to delay the accepted state until we have
+ // completed storing the attempt
+ appAttempt.eventHandler.handle(new RMAppEvent(event
+ .getApplicationAttemptId().getApplicationId(),
+ RMAppEventType.APP_ACCEPTED));
+
+ super.transition(appAttempt, event);
+ }
+ }
private static final class LaunchFailedTransition extends BaseFinalTransition {
@@ -1040,4 +1141,37 @@ public class RMAppAttemptImpl implements
this.readLock.unlock();
}
}
+
+ private void launchAttempt(){
+ // Send event to launch the AM Container
+ eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
+ }
+
+ private void attemptLaunched() {
+ // Register with AMLivelinessMonitor
+ rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
+ }
+
+ private void checkAttemptStoreError(RMAppAttemptEvent event) {
+ RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
+ if(storeEvent.getStoredException() != null)
+ {
+ // This needs to be handled for HA and give up master status if we got
+ // fenced
+ LOG.error("Failed to store attempt: " + getAppAttemptId(),
+ storeEvent.getStoredException());
+ ExitUtil.terminate(1, storeEvent.getStoredException());
+ }
+ }
+
+ private void storeAttempt(RMStateStore store) {
+ // store attempt data in a non-blocking manner to prevent dispatcher
+ // thread starvation and wait for state to be saved
+ LOG.info("Storing attempt: AppId: " +
+ getAppAttemptId().getApplicationId()
+ + " AttemptId: " +
+ getAppAttemptId()
+ + " MasterContainer: " + masterContainer);
+ store.storeApplicationAttempt(this);
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Fri Jan 11 19:40:23 2013
@@ -19,6 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
public enum RMAppAttemptState {
- NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING,
- FINISHING, FINISHED, KILLED,
+ NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING,
+ FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Fri Jan 11 19:40:23 2013
@@ -199,6 +199,11 @@ public class RMContainerImpl implements
}
@Override
+ public String toString() {
+ return containerId.toString();
+ }
+
+ @Override
public void handle(RMContainerEvent event) {
LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType());
try {
@@ -221,7 +226,7 @@ public class RMContainerImpl implements
writeLock.unlock();
}
}
-
+
private static class BaseTransition implements
SingleArcTransition<RMContainerImpl, RMContainerEvent> {
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Jan 11 19:40:23 2013
@@ -177,7 +177,9 @@ public class FSLeafQueue extends FSQueue
Collections.sort(appScheds, comparator);
for (AppSchedulable sched: appScheds) {
- return sched.assignContainer(node, reserved);
+ if (sched.getRunnable()) {
+ return sched.assignContainer(node, reserved);
+ }
}
return Resources.none();
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jan 11 19:40:23 2013
@@ -514,7 +514,6 @@ public class FairScheduler implements Re
queue.addApp(schedulerApp);
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
- rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp);
@@ -777,7 +776,8 @@ public class FairScheduler implements Re
boolean assignedContainer = false;
for (FSLeafQueue sched : scheds) {
Resource assigned = sched.assignContainer(node, false);
- if (Resources.greaterThan(assigned, Resources.none())) {
+ if (Resources.greaterThan(assigned, Resources.none()) ||
+ node.getReservedContainer() != null) {
eventLog.log("ASSIGN", nm.getHostName(), assigned);
assignedContainers++;
assignedContainer = true;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Jan 11 19:40:23 2013
@@ -227,6 +227,9 @@ public class QueueManager {
* Return whether a queue exists already.
*/
public boolean exists(String name) {
+ if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
+ name = ROOT_QUEUE + "." + name;
+ }
synchronized (queues) {
return queues.containsKey(name);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Fri Jan 11 19:40:23 2013
@@ -276,21 +276,26 @@ public class DelegationTokenRenewer exte
Collection <Token<?>> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();
+ // find tokens for renewal, but don't add timers until we know
+ // all renewable tokens are valid
+ Set<DelegationTokenToRenew> dtrs = new HashSet<DelegationTokenToRenew>();
for(Token<?> token : tokens) {
// first renew happens immediately
if (token.isManaged()) {
DelegationTokenToRenew dtr =
new DelegationTokenToRenew(applicationId, token, getConfig(), now,
shouldCancelAtEnd);
-
- addTokenToList(dtr);
-
- setTimerForTokenRenewal(dtr, true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Registering token for renewal for:" +
- " service = " + token.getService() +
- " for appId = " + applicationId);
- }
+ renewToken(dtr);
+ dtrs.add(dtr);
+ }
+ }
+ for (DelegationTokenToRenew dtr : dtrs) {
+ addTokenToList(dtr);
+ setTimerForTokenRenewal(dtr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registering token for renewal for:" +
+ " service = " + dtr.token.getService() +
+ " for appId = " + applicationId);
}
}
}
@@ -301,54 +306,49 @@ public class DelegationTokenRenewer exte
*/
private class RenewalTimerTask extends TimerTask {
private DelegationTokenToRenew dttr;
+ private boolean cancelled = false;
RenewalTimerTask(DelegationTokenToRenew t) {
dttr = t;
}
@Override
- public void run() {
+ public synchronized void run() {
+ if (cancelled) {
+ return;
+ }
+
Token<?> token = dttr.token;
try {
- // need to use doAs so that http can find the kerberos tgt
- dttr.expirationDate = UserGroupInformation.getLoginUser()
- .doAs(new PrivilegedExceptionAction<Long>(){
-
- @Override
- public Long run() throws Exception {
- return dttr.token.renew(dttr.conf);
- }
- });
-
+ renewToken(dttr);
if (LOG.isDebugEnabled()) {
LOG.debug("Renewing delegation-token for:" + token.getService() +
"; new expiration;" + dttr.expirationDate);
}
- setTimerForTokenRenewal(dttr, false);// set the next one
+ setTimerForTokenRenewal(dttr);// set the next one
} catch (Exception e) {
LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
removeFailedDelegationToken(dttr);
}
}
+
+ @Override
+ public synchronized boolean cancel() {
+ cancelled = true;
+ return super.cancel();
+ }
}
/**
* set task to renew the token
*/
- private
- void setTimerForTokenRenewal(DelegationTokenToRenew token,
- boolean firstTime) throws IOException {
+ private void setTimerForTokenRenewal(DelegationTokenToRenew token)
+ throws IOException {
// calculate timer time
- long now = System.currentTimeMillis();
- long renewIn;
- if(firstTime) {
- renewIn = now;
- } else {
- long expiresIn = (token.expirationDate - now);
- renewIn = now + expiresIn - expiresIn/10; // little bit before the expiration
- }
+ long expiresIn = token.expirationDate - System.currentTimeMillis();
+ long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
// need to create new task every time
TimerTask tTask = new RenewalTimerTask(token);
@@ -357,6 +357,24 @@ public class DelegationTokenRenewer exte
renewalTimer.schedule(token.timerTask, new Date(renewIn));
}
+ // renew a token
+ private void renewToken(final DelegationTokenToRenew dttr)
+ throws IOException {
+ // need to use doAs so that http can find the kerberos tgt
+ // NOTE: token renewers should be responsible for the correct UGI!
+ try {
+ dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
+ new PrivilegedExceptionAction<Long>(){
+ @Override
+ public Long run() throws Exception {
+ return dttr.token.renew(dttr.conf);
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
// cancel a token
private void cancelToken(DelegationTokenToRenew t) {
if(t.shouldCancelAtEnd) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Fri Jan 11 19:40:23 2013
@@ -84,12 +84,12 @@ class AppsBlock extends HtmlBlock {
appsTableData.append("[\"<a href='")
.append(url("app", appInfo.getAppId())).append("'>")
.append(appInfo.getAppId()).append("</a>\",\"")
- .append(StringEscapeUtils.escapeHtml(appInfo.getUser()))
- .append("\",\"")
- .append(StringEscapeUtils.escapeHtml(appInfo.getName()))
- .append("\",\"")
- .append(StringEscapeUtils.escapeHtml(appInfo.getQueue()))
- .append("\",\"")
+ .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+ appInfo.getUser()))).append("\",\"")
+ .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+ appInfo.getName()))).append("\",\"")
+ .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+ appInfo.getQueue()))).append("\",\"")
.append(appInfo.getStartTime()).append("\",\"")
.append(appInfo.getFinishTime()).append("\",\"")
.append(appInfo.getState()).append("\",\"")
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java Fri Jan 11 19:40:23 2013
@@ -20,13 +20,14 @@ package org.apache.hadoop.yarn.server.re
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_STATE;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -36,7 +37,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
-import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
@@ -86,44 +86,52 @@ public class FairSchedulerAppsBlock exte
reqAppStates.add(RMAppState.valueOf(stateString));
}
}
+ StringBuilder appsTableData = new StringBuilder("[\n");
for (RMApp app : apps.values()) {
if (reqAppStates != null && !reqAppStates.contains(app.getState())) {
continue;
}
AppInfo appInfo = new AppInfo(app, true);
String percent = String.format("%.1f", appInfo.getProgress());
- String startTime = Times.format(appInfo.getStartTime());
- String finishTime = Times.format(appInfo.getFinishTime());
ApplicationAttemptId attemptId = app.getCurrentAppAttempt().getAppAttemptId();
int fairShare = fsinfo.getAppFairShare(attemptId);
+ //AppID numerical value parsed by parseHadoopID in yarn.dt.plugins.js
+ appsTableData.append("[\"<a href='")
+ .append(url("app", appInfo.getAppId())).append("'>")
+ .append(appInfo.getAppId()).append("</a>\",\"")
+ .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+ appInfo.getUser()))).append("\",\"")
+ .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+ appInfo.getName()))).append("\",\"")
+ .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+ appInfo.getQueue()))).append("\",\"")
+ .append(fairShare).append("\",\"")
+ .append(appInfo.getStartTime()).append("\",\"")
+ .append(appInfo.getFinishTime()).append("\",\"")
+ .append(appInfo.getState()).append("\",\"")
+ .append(appInfo.getFinalStatus()).append("\",\"")
+ // Progress bar
+ .append("<br title='").append(percent)
+ .append("'> <div class='").append(C_PROGRESSBAR).append("' title='")
+ .append(join(percent, '%')).append("'> ").append("<div class='")
+ .append(C_PROGRESSBAR_VALUE).append("' style='")
+ .append(join("width:", percent, '%')).append("'> </div> </div>")
+ .append("\",\"<a href='");
+
+ String trackingURL =
+ !appInfo.isTrackingUrlReady()? "#" : appInfo.getTrackingUrlPretty();
+
+ appsTableData.append(trackingURL).append("'>")
+ .append(appInfo.getTrackingUI()).append("</a>\"],\n");
- tbody.
- tr().
- td().
- br().$title(appInfo.getAppIdNum())._(). // for sorting
- a(url("app", appInfo.getAppId()), appInfo.getAppId())._().
- td(appInfo.getUser()).
- td(appInfo.getName()).
- td(appInfo.getQueue()).
- td("" + fairShare).
- td().
- br().$title(String.valueOf(appInfo.getStartTime()))._().
- _(startTime)._().
- td().
- br().$title(String.valueOf(appInfo.getFinishTime()))._().
- _(finishTime)._().
- td(appInfo.getState()).
- td(appInfo.getFinalStatus()).
- td().
- br().$title(percent)._(). // for sorting
- div(_PROGRESSBAR).
- $title(join(percent, '%')). // tooltip
- div(_PROGRESSBAR_VALUE).
- $style(join("width:", percent, '%'))._()._()._().
- td().
- a(!appInfo.isTrackingUrlReady()?
- "#" : appInfo.getTrackingUrlPretty(), appInfo.getTrackingUI())._()._();
}
+ if(appsTableData.charAt(appsTableData.length() - 2) == ',') {
+ appsTableData.delete(appsTableData.length()-2, appsTableData.length()-1);
+ }
+ appsTableData.append("]");
+ html.script().$type("text/javascript").
+ _("var appsTableData=" + appsTableData)._();
+
tbody._()._();
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Fri Jan 11 19:40:23 2013
@@ -20,16 +20,18 @@ package org.apache.hadoop.yarn.server.re
import static org.apache.hadoop.yarn.util.StringHelper.join;
-import java.util.List;
+import java.util.Collection;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerLeafQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerQueueInfo;
import org.apache.hadoop.yarn.webapp.ResponseInfo;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.LI;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.UL;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.apache.hadoop.yarn.webapp.view.InfoBlock;
@@ -48,16 +50,15 @@ public class FairSchedulerPage extends R
@RequestScoped
static class FSQInfo {
- FairSchedulerInfo fsinfo;
FairSchedulerQueueInfo qinfo;
}
- static class QueueInfoBlock extends HtmlBlock {
- final FairSchedulerQueueInfo qinfo;
+ static class LeafQueueBlock extends HtmlBlock {
+ final FairSchedulerLeafQueueInfo qinfo;
- @Inject QueueInfoBlock(ViewContext ctx, FSQInfo info) {
+ @Inject LeafQueueBlock(ViewContext ctx, FSQInfo info) {
super(ctx);
- qinfo = (FairSchedulerQueueInfo) info.qinfo;
+ qinfo = (FairSchedulerLeafQueueInfo)info.qinfo;
}
@Override
@@ -81,6 +82,47 @@ public class FairSchedulerPage extends R
}
}
+ static class QueueBlock extends HtmlBlock {
+ final FSQInfo fsqinfo;
+
+ @Inject QueueBlock(FSQInfo info) {
+ fsqinfo = info;
+ }
+
+ @Override
+ public void render(Block html) {
+ Collection<FairSchedulerQueueInfo> subQueues = fsqinfo.qinfo.getChildQueues();
+ UL<Hamlet> ul = html.ul("#pq");
+ for (FairSchedulerQueueInfo info : subQueues) {
+ float capacity = info.getMaxResourcesFraction();
+ float fairShare = info.getFairShareFraction();
+ float used = info.getUsedFraction();
+ LI<UL<Hamlet>> li = ul.
+ li().
+ a(_Q).$style(width(capacity * Q_MAX_WIDTH)).
+ $title(join("Fair Share:", percent(fairShare))).
+ span().$style(join(Q_GIVEN, ";font-size:1px;", width(fairShare/capacity))).
+ _('.')._().
+ span().$style(join(width(used/capacity),
+ ";font-size:1px;left:0%;", used > fairShare ? Q_OVER : Q_UNDER)).
+ _('.')._().
+ span(".q", info.getQueueName())._().
+ span().$class("qstats").$style(left(Q_STATS_POS)).
+ _(join(percent(used), " used"))._();
+
+ fsqinfo.qinfo = info;
+ if (info instanceof FairSchedulerLeafQueueInfo) {
+ li.ul("#lq").li()._(LeafQueueBlock.class)._()._();
+ } else {
+ li._(QueueBlock.class);
+ }
+ li._();
+ }
+
+ ul._();
+ }
+ }
+
static class QueuesBlock extends HtmlBlock {
final FairScheduler fs;
final FSQInfo fsqinfo;
@@ -89,8 +131,9 @@ public class FairSchedulerPage extends R
fs = (FairScheduler)rm.getResourceScheduler();
fsqinfo = info;
}
-
- @Override public void render(Block html) {
+
+ @Override
+ public void render(Block html) {
html._(MetricsOverviewTable.class);
UL<DIV<DIV<Hamlet>>> ul = html.
div("#cs-wrapper.ui-widget").
@@ -106,8 +149,8 @@ public class FairSchedulerPage extends R
span(".q", "default")._()._();
} else {
FairSchedulerInfo sinfo = new FairSchedulerInfo(fs);
- fsqinfo.fsinfo = sinfo;
- fsqinfo.qinfo = null;
+ fsqinfo.qinfo = sinfo.getRootQueueInfo();
+ float used = fsqinfo.qinfo.getUsedFraction();
ul.
li().$style("margin-bottom: 1em").
@@ -120,29 +163,15 @@ public class FairSchedulerPage extends R
_("Used (over fair share)")._().
span().$class("qlegend ui-corner-all ui-state-default").
_("Max Capacity")._().
- _();
-
- List<FairSchedulerQueueInfo> subQueues = fsqinfo.fsinfo.getQueueInfos();
- for (FairSchedulerQueueInfo info : subQueues) {
- fsqinfo.qinfo = info;
- float capacity = info.getMaxResourcesFraction();
- float fairShare = info.getFairShareFraction();
- float used = info.getUsedFraction();
- ul.
- li().
- a(_Q).$style(width(capacity * Q_MAX_WIDTH)).
- $title(join("Fair Share:", percent(fairShare))).
- span().$style(join(Q_GIVEN, ";font-size:1px;", width(fairShare/capacity))).
- _('.')._().
- span().$style(join(width(used/capacity),
- ";font-size:1px;left:0%;", used > fairShare ? Q_OVER : Q_UNDER)).
- _('.')._().
- span(".q", info.getQueueName())._().
- span().$class("qstats").$style(left(Q_STATS_POS)).
- _(join(percent(used), " used"))._().
- ul("#lq").li()._(QueueInfoBlock.class)._()._().
- _();
- }
+ _().
+ li().
+ a(_Q).$style(width(Q_MAX_WIDTH)).
+ span().$style(join(width(used), ";left:0%;",
+ used > 1 ? Q_OVER : Q_UNDER))._(".")._().
+ span(".q", "root")._().
+ span().$class("qstats").$style(left(Q_STATS_POS)).
+ _(join(percent(used), " used"))._().
+ _(QueueBlock.class)._();
}
ul._()._().
script().$type("text/javascript").
@@ -159,13 +188,16 @@ public class FairSchedulerPage extends R
"#cs a { font-weight: normal; margin: 2px; position: relative }",
"#cs a span { font-weight: normal; font-size: 80% }",
"#cs-wrapper .ui-widget-header { padding: 0.2em 0.5em }",
+ ".qstats { font-weight: normal; font-size: 80%; position: absolute }",
+ ".qlegend { font-weight: normal; padding: 0 1em; margin: 1em }",
"table.info tr th {width: 50%}")._(). // to center info table
script("/static/jt/jquery.jstree.js").
script().$type("text/javascript").
_("$(function() {",
" $('#cs a span').addClass('ui-corner-all').css('position', 'absolute');",
" $('#cs').bind('loaded.jstree', function (e, data) {",
- " data.inst.open_all(); }).",
+ " data.inst.open_node('#pq', true);",
+ " }).",
" jstree({",
" core: { animation: 188, html_titles: true },",
" plugins: ['themeroller', 'html_data', 'ui'],",
@@ -175,8 +207,9 @@ public class FairSchedulerPage extends R
" });",
" $('#cs').bind('select_node.jstree', function(e, data) {",
" var q = $('.q', data.rslt.obj).first().text();",
- " if (q == 'root') q = '';",
- " $('#apps').dataTable().fnFilter(q, 3);",
+ " if (q == 'root') q = '';",
+ " else q = '^' + q.substr(q.lastIndexOf('.') + 1) + '$';",
+ " $('#apps').dataTable().fnFilter(q, 3, true);",
" });",
" $('#cs').show();",
"});")._();
@@ -197,4 +230,19 @@ public class FairSchedulerPage extends R
static String left(float f) {
return String.format("left:%.1f%%", f * 100);
}
+
+ @Override
+ protected String getAppsTableColumnDefs() {
+ StringBuilder sb = new StringBuilder();
+ return sb
+ .append("[\n")
+ .append("{'sType':'numeric', 'aTargets': [0]")
+ .append(", 'mRender': parseHadoopID }")
+
+ .append("\n, {'sType':'numeric', 'aTargets': [5, 6]")
+ .append(", 'mRender': renderHadoopDate }")
+
+ .append("\n, {'sType':'numeric', bSearchable:false, 'aTargets': [9]")
+ .append(", 'mRender': parseHadoopProgress }]").toString();
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java Fri Jan 11 19:40:23 2013
@@ -66,7 +66,17 @@ public class RmView extends TwoColumnLay
.append(", bDeferRender: true")
.append(", bProcessing: true")
- .append("\n, aoColumnDefs: [\n")
+ .append("\n, aoColumnDefs: ")
+ .append(getAppsTableColumnDefs())
+
+ // Sort by id upon page load
+ .append(", aaSorting: [[0, 'desc']]}").toString();
+ }
+
+ protected String getAppsTableColumnDefs() {
+ StringBuilder sb = new StringBuilder();
+ return sb
+ .append("[\n")
.append("{'sType':'numeric', 'aTargets': [0]")
.append(", 'mRender': parseHadoopID }")
@@ -74,9 +84,6 @@ public class RmView extends TwoColumnLay
.append(", 'mRender': renderHadoopDate }")
.append("\n, {'sType':'numeric', bSearchable:false, 'aTargets': [8]")
- .append(", 'mRender': parseHadoopProgress }]")
-
- // Sort by id upon page load
- .append(", aaSorting: [[0, 'desc']]}").toString();
+ .append(", 'mRender': parseHadoopProgress }]").toString();
}
}