You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ac...@apache.org on 2012/12/19 05:21:24 UTC
svn commit: r1423758 [2/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/
hadoop-yarn...
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Wed Dec 19 04:21:18 2012
@@ -44,6 +44,12 @@ public interface RMApp extends EventHand
* @return the {@link ApplicationId} for this {@link RMApp}.
*/
ApplicationId getApplicationId();
+
+ /**
+ * The application submission context for this {@link RMApp}
+ * @return the {@link ApplicationSubmissionContext} for this {@link RMApp}
+ */
+ ApplicationSubmissionContext getApplicationSubmissionContext();
/**
* The current state of the {@link RMApp}.
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Dec 19 04:21:18 2012
@@ -50,6 +50,9 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -66,7 +69,7 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
-public class RMAppImpl implements RMApp {
+public class RMAppImpl implements RMApp, Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
private static final String UNAVAILABLE = "N/A";
@@ -243,6 +246,11 @@ public class RMAppImpl implements RMApp
public ApplicationId getApplicationId() {
return this.applicationId;
}
+
+ @Override
+ public ApplicationSubmissionContext getApplicationSubmissionContext() {
+ return this.submissionContext;
+ }
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
@@ -512,9 +520,22 @@ public class RMAppImpl implements RMApp
this.writeLock.unlock();
}
}
+
+ @Override
+ public void recover(RMState state) {
+ ApplicationState appState = state.getApplicationState().get(getApplicationId());
+ LOG.info("Recovering app: " + getApplicationId() + " with " +
+ + appState.getAttemptCount() + " attempts");
+ for(int i=0; i<appState.getAttemptCount(); ++i) {
+ // create attempt
+ createNewAttempt(false);
+ // recover attempt
+ ((RMAppAttemptImpl) currentAttempt).recover(state);
+ }
+ }
@SuppressWarnings("unchecked")
- private void createNewAttempt() {
+ private void createNewAttempt(boolean startAttempt) {
ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(applicationId);
@@ -525,8 +546,10 @@ public class RMAppImpl implements RMApp
submissionContext, conf);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
- handler.handle(
- new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+ if(startAttempt) {
+ handler.handle(
+ new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+ }
}
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
@@ -553,7 +576,7 @@ public class RMAppImpl implements RMApp
private static final class StartAppAttemptTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
- app.createNewAttempt();
+ app.createNewAttempt(true);
};
}
@@ -647,7 +670,7 @@ public class RMAppImpl implements RMApp
msg = "Unmanaged application " + app.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
- } else if (app.attempts.size() == app.maxRetries) {
+ } else if (app.attempts.size() >= app.maxRetries) {
retryApp = false;
msg = "Application " + app.getApplicationId() + " failed "
+ app.maxRetries + " times due to " + failedEvent.getDiagnostics()
@@ -655,7 +678,7 @@ public class RMAppImpl implements RMApp
}
if (retryApp) {
- app.createNewAttempt();
+ app.createNewAttempt(true);
return initialState;
} else {
LOG.info(msg);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Wed Dec 19 04:21:18 2012
@@ -39,9 +39,15 @@ public enum RMAppAttemptEventType {
CONTAINER_ACQUIRED,
CONTAINER_ALLOCATED,
CONTAINER_FINISHED,
+
+ // Source: RMStateStore
+ ATTEMPT_SAVED,
// Source: Scheduler
APP_REJECTED,
APP_ACCEPTED,
+
+ // Source: RMAttemptImpl.recover
+ RECOVER
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Dec 19 04:21:18 2012
@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -57,6 +58,11 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -69,6 +75,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -85,7 +92,7 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.util.BuilderUtils;
@SuppressWarnings({"unchecked", "rawtypes"})
-public class RMAppAttemptImpl implements RMAppAttempt {
+public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
@@ -153,12 +160,15 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
RMAppAttemptEventType.REGISTERED,
new UnexpectedAMRegisteredTransition())
+ .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED,
+ RMAppAttemptEventType.RECOVER)
// Transitions from SUBMITTED state
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
.addTransition(RMAppAttemptState.SUBMITTED,
- EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED),
+ EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+ RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.APP_ACCEPTED,
new ScheduleTransition())
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
@@ -170,12 +180,42 @@ public class RMAppAttemptImpl implements
// Transitions from SCHEDULED State
.addTransition(RMAppAttemptState.SCHEDULED,
- RMAppAttemptState.ALLOCATED,
+ RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
new AMContainerAllocatedTransition())
.addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
RMAppAttemptEventType.KILL,
new BaseFinalTransition(RMAppAttemptState.KILLED))
+
+ // Transitions from ALLOCATED_SAVING State
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
+ RMAppAttemptState.ALLOCATED,
+ RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
+ RMAppAttemptState.ALLOCATED_SAVING,
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
+ new ContainerAcquiredTransition())
+ // App could be killed by the client. So need to handle this.
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
+ RMAppAttemptState.KILLED,
+ RMAppAttemptEventType.KILL,
+ new BaseFinalTransition(RMAppAttemptState.KILLED))
+
+ // Transitions from LAUNCHED_UNMANAGED_SAVING State
+ .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+ RMAppAttemptState.LAUNCHED,
+ RMAppAttemptEventType.ATTEMPT_SAVED,
+ new UnmanagedAMAttemptSavedTransition())
+ // attempt should not try to register in this state
+ .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+ RMAppAttemptState.FAILED,
+ RMAppAttemptEventType.REGISTERED,
+ new UnexpectedAMRegisteredTransition())
+ // App could be killed by the client. So need to handle this.
+ .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+ RMAppAttemptState.KILLED,
+ RMAppAttemptEventType.KILL,
+ new BaseFinalTransition(RMAppAttemptState.KILLED))
// Transitions from ALLOCATED State
.addTransition(RMAppAttemptState.ALLOCATED,
@@ -279,11 +319,30 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
+ RMAppAttemptEventType.ATTEMPT_SAVED,
+ RMAppAttemptEventType.CONTAINER_FINISHED,
+ RMAppAttemptEventType.UNREGISTERED,
+ RMAppAttemptEventType.KILL,
+ RMAppAttemptEventType.STATUS_UPDATE))
+
+ // Transitions from RECOVERED State
+ .addTransition(
+ RMAppAttemptState.RECOVERED,
+ RMAppAttemptState.RECOVERED,
+ EnumSet.of(RMAppAttemptEventType.START,
+ RMAppAttemptEventType.APP_ACCEPTED,
+ RMAppAttemptEventType.APP_REJECTED,
+ RMAppAttemptEventType.EXPIRE,
+ RMAppAttemptEventType.LAUNCHED,
+ RMAppAttemptEventType.LAUNCH_FAILED,
+ RMAppAttemptEventType.REGISTERED,
+ RMAppAttemptEventType.CONTAINER_ALLOCATED,
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
+ RMAppAttemptEventType.ATTEMPT_SAVED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
-
.installTopology();
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@@ -318,7 +377,7 @@ public class RMAppAttemptImpl implements
@Override
public ApplicationSubmissionContext getSubmissionContext() {
return this.submissionContext;
- }
+ }
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
@@ -494,6 +553,10 @@ public class RMAppAttemptImpl implements
}
}
+ private void setMasterContainer(Container container) {
+ masterContainer = container;
+ }
+
@Override
public void handle(RMAppAttemptEvent event) {
@@ -561,6 +624,21 @@ public class RMAppAttemptImpl implements
}
}
+ @Override
+ public void recover(RMState state) {
+ ApplicationState appState =
+ state.getApplicationState().get(getAppAttemptId().getApplicationId());
+ ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
+ assert attemptState != null;
+ setMasterContainer(attemptState.getMasterContainer());
+ LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
+ + " AttemptId: " + getAppAttemptId()
+ + " MasterContainer: " + masterContainer);
+ setDiagnostics("Attempt recovered after RM restart");
+ handle(new RMAppAttemptEvent(getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
+ }
+
private static class BaseTransition implements
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
@@ -625,13 +703,12 @@ public class RMAppAttemptImpl implements
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
-
- // Send the acceptance to the app
- appAttempt.eventHandler.handle(new RMAppEvent(event
- .getApplicationAttemptId().getApplicationId(),
- RMAppEventType.APP_ACCEPTED));
-
if (!appAttempt.submissionContext.getUnmanagedAM()) {
+ // Send the acceptance to the app
+ appAttempt.eventHandler.handle(new RMAppEvent(event
+ .getApplicationAttemptId().getApplicationId(),
+ RMAppEventType.APP_ACCEPTED));
+
// Request a container for the AM.
ResourceRequest request = BuilderUtils.newResourceRequest(
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
@@ -647,35 +724,42 @@ public class RMAppAttemptImpl implements
return RMAppAttemptState.SCHEDULED;
} else {
// RM not allocating container. AM is self launched.
- // Directly go to LAUNCHED state
- // Register with AMLivelinessMonitor
- appAttempt.rmContext.getAMLivelinessMonitor().register(
- appAttempt.applicationAttemptId);
- return RMAppAttemptState.LAUNCHED;
+ RMStateStore store = appAttempt.rmContext.getStateStore();
+ // save state and then go to LAUNCHED state
+ appAttempt.storeAttempt(store);
+ return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
}
}
}
- private static final class AMContainerAllocatedTransition extends BaseTransition {
+ private static final class AMContainerAllocatedTransition
+ extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
- RMAppAttemptEvent event) {
-
+ RMAppAttemptEvent event) {
// Acquire the AM container from the scheduler.
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
EMPTY_CONTAINER_RELEASE_LIST);
// Set the masterContainer
- appAttempt.masterContainer = amContainerAllocation.getContainers().get(
- 0);
+ appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
+ 0));
- // Send event to launch the AM Container
- appAttempt.eventHandler.handle(new AMLauncherEvent(
- AMLauncherEventType.LAUNCH, appAttempt));
+ RMStateStore store = appAttempt.rmContext.getStateStore();
+ appAttempt.storeAttempt(store);
}
}
-
+
+ private static final class AttemptStoredTransition extends BaseTransition {
+ @Override
+ public void transition(RMAppAttemptImpl appAttempt,
+ RMAppAttemptEvent event) {
+ appAttempt.checkAttemptStoreError(event);
+ appAttempt.launchAttempt();
+ }
+ }
+
private static class BaseFinalTransition extends BaseTransition {
private final RMAppAttemptState finalAttemptState;
@@ -736,17 +820,34 @@ public class RMAppAttemptImpl implements
}
}
- private static final class AMLaunchedTransition extends BaseTransition {
+ private static class AMLaunchedTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
- RMAppAttemptEvent event) {
-
+ RMAppAttemptEvent event) {
// Register with AMLivelinessMonitor
- appAttempt.rmContext.getAMLivelinessMonitor().register(
- appAttempt.applicationAttemptId);
-
+ appAttempt.attemptLaunched();
}
}
+
+ private static final class UnmanagedAMAttemptSavedTransition
+ extends AMLaunchedTransition {
+ @Override
+ public void transition(RMAppAttemptImpl appAttempt,
+ RMAppAttemptEvent event) {
+ appAttempt.checkAttemptStoreError(event);
+ // Send the acceptance to the app
+ // Ideally this should have been done when the scheduler accepted the app.
+ // But its here because until the attempt is saved the client should not
+ // launch the unmanaged AM. Client waits for the app status to be accepted
+ // before doing so. So we have to delay the accepted state until we have
+ // completed storing the attempt
+ appAttempt.eventHandler.handle(new RMAppEvent(event
+ .getApplicationAttemptId().getApplicationId(),
+ RMAppEventType.APP_ACCEPTED));
+
+ super.transition(appAttempt, event);
+ }
+ }
private static final class LaunchFailedTransition extends BaseFinalTransition {
@@ -1040,4 +1141,37 @@ public class RMAppAttemptImpl implements
this.readLock.unlock();
}
}
+
+ private void launchAttempt(){
+ // Send event to launch the AM Container
+ eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
+ }
+
+ private void attemptLaunched() {
+ // Register with AMLivelinessMonitor
+ rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
+ }
+
+ private void checkAttemptStoreError(RMAppAttemptEvent event) {
+ RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
+ if(storeEvent.getStoredException() != null)
+ {
+ // This needs to be handled for HA and give up master status if we got
+ // fenced
+ LOG.error("Failed to store attempt: " + getAppAttemptId(),
+ storeEvent.getStoredException());
+ ExitUtil.terminate(1, storeEvent.getStoredException());
+ }
+ }
+
+ private void storeAttempt(RMStateStore store) {
+ // store attempt data in a non-blocking manner to prevent dispatcher
+ // thread starvation and wait for state to be saved
+ LOG.info("Storing attempt: AppId: " +
+ getAppAttemptId().getApplicationId()
+ + " AttemptId: " +
+ getAppAttemptId()
+ + " MasterContainer: " + masterContainer);
+ store.storeApplicationAttempt(this);
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Wed Dec 19 04:21:18 2012
@@ -19,6 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
public enum RMAppAttemptState {
- NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING,
- FINISHING, FINISHED, KILLED,
+ NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING,
+ FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+
+public class RMAppAttemptStoredEvent extends RMAppAttemptEvent {
+
+ final Exception storedException;
+
+ public RMAppAttemptStoredEvent(ApplicationAttemptId appAttemptId,
+ Exception storedException) {
+ super(appAttemptId, RMAppAttemptEventType.ATTEMPT_SAVED);
+ this.storedException = storedException;
+ }
+
+ public Exception getStoredException() {
+ return storedException;
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Wed Dec 19 04:21:18 2012
@@ -47,7 +47,7 @@ public class MockAM {
private volatile int responseId = 0;
private final ApplicationAttemptId attemptId;
private final RMContext context;
- private final AMRMProtocol amRMProtocol;
+ private AMRMProtocol amRMProtocol;
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
private final List<ContainerId> releases = new ArrayList<ContainerId>();
@@ -58,6 +58,10 @@ public class MockAM {
this.amRMProtocol = amRMProtocol;
this.attemptId = attemptId;
}
+
+ void setAMRMProtocol(AMRMProtocol amRMProtocol) {
+ this.amRMProtocol = amRMProtocol;
+ }
public void waitForState(RMAppAttemptState finalState) throws Exception {
RMApp app = context.getRMApps().get(attemptId.getApplicationId());
@@ -66,7 +70,8 @@ public class MockAM {
while (!finalState.equals(attempt.getAppAttemptState())
&& timeoutSecs++ < 20) {
System.out
- .println("AppAttempt State is : " + attempt.getAppAttemptState()
+ .println("AppAttempt : " + attemptId + " State is : "
+ + attempt.getAppAttemptState()
+ " Waiting for state : " + finalState);
Thread.sleep(500);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Wed Dec 19 04:21:18 2012
@@ -46,7 +46,7 @@ public class MockNM {
private int responseId;
private NodeId nodeId;
private final int memory;
- private final ResourceTrackerService resourceTracker;
+ private ResourceTrackerService resourceTracker;
private final int httpPort = 2;
private MasterKey currentMasterKey;
@@ -66,6 +66,10 @@ public class MockNM {
public int getHttpPort() {
return httpPort;
}
+
+ void setResourceTrackerService(ResourceTrackerService resourceTracker) {
+ this.resourceTracker = resourceTracker;
+ }
public void containerStatus(Container container) throws Exception {
Map<ApplicationId, List<ContainerStatus>> conts =
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Dec 19 04:21:18 2012
@@ -39,9 +39,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -63,10 +64,17 @@ public class MockRM extends ResourceMana
}
public MockRM(Configuration conf) {
- super(StoreFactory.getStore(conf));
+ this(conf, null);
+ }
+
+ public MockRM(Configuration conf, RMStateStore store) {
+ super();
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
+ if(store != null) {
+ setRMStateStore(store);
+ }
Logger rootLogger = LogManager.getRootLogger();
- rootLogger.setLevel(Level.DEBUG);
+ rootLogger.setLevel(Level.DEBUG);
}
public void waitForState(ApplicationId appId, RMAppState finalState)
@@ -75,7 +83,7 @@ public class MockRM extends ResourceMana
Assert.assertNotNull("app shouldn't be null", app);
int timeoutSecs = 0;
while (!finalState.equals(app.getState()) && timeoutSecs++ < 20) {
- System.out.println("App State is : " + app.getState()
+ System.out.println("App : " + appId + " State is : " + app.getState()
+ " Waiting for state : " + finalState);
Thread.sleep(500);
}
@@ -83,6 +91,24 @@ public class MockRM extends ResourceMana
Assert.assertEquals("App state is not correct (timedout)", finalState,
app.getState());
}
+
+ public void waitForState(ApplicationAttemptId attemptId,
+ RMAppAttemptState finalState)
+ throws Exception {
+ RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
+ Assert.assertNotNull("app shouldn't be null", app);
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ int timeoutSecs = 0;
+ while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < 20) {
+ System.out.println("AppAttempt : " + attemptId
+ + " State is : " + attempt.getAppAttemptState()
+ + " Waiting for state : " + finalState);
+ Thread.sleep(500);
+ }
+ System.out.println("Attempt State is : " + attempt.getAppAttemptState());
+ Assert.assertEquals("Attempt state is not correct (timedout)", finalState,
+ attempt.getAppAttemptState());
+ }
// get new application id
public GetNewApplicationResponse getNewAppId() throws Exception {
@@ -97,11 +123,16 @@ public class MockRM extends ResourceMana
// client
public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
- return submitApp(masterMemory, name, user, null);
+ return submitApp(masterMemory, name, user, null, false);
}
-
+
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls) throws Exception {
+ return submitApp(masterMemory, name, user, acls, false);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map<ApplicationAccessType, String> acls, boolean unmanaged) throws Exception {
ClientRMProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@@ -114,6 +145,9 @@ public class MockRM extends ResourceMana
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setUser(user);
+ if(unmanaged) {
+ sub.setUnmanagedAM(true);
+ }
ContainerLaunchContext clc = Records
.newRecord(ContainerLaunchContext.class);
Resource capability = Records.newRecord(Resource.class);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java Wed Dec 19 04:21:18 2012
@@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -85,7 +85,7 @@ public class TestApplicationACLs {
@BeforeClass
public static void setup() throws InterruptedException, IOException {
- RMStateStore store = StoreFactory.getStore(conf);
+ RMStateStore store = RMStateStoreFactory.getStore(conf);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
AccessControlList adminACL = new AccessControlList("");
adminACL.addGroup(SUPER_GROUP);
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRMRestart {
+
+ @Test
+ public void testRMRestart() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ ExitUtil.disableSystemExit();
+
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE,
+ "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
+ conf.set(YarnConfiguration.RM_SCHEDULER,
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+ Map<ApplicationId, ApplicationState> rmAppState =
+ rmState.getApplicationState();
+
+
+ // PHASE 1: create state in an RM
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+
+ // start like normal because state is empty
+ rm1.start();
+
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ MockNM nm2 = new MockNM("h2:5678", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm2.registerNode(); // nm2 will not heartbeat with RM1
+
+ // create app that will not be saved because it will finish
+ RMApp app0 = rm1.submitApp(200);
+ RMAppAttempt attempt0 = app0.getCurrentAppAttempt();
+ // spot check that app is saved
+ Assert.assertEquals(1, rmAppState.size());
+ nm1.nodeHeartbeat(true);
+ MockAM am0 = rm1.sendAMLaunched(attempt0.getAppAttemptId());
+ am0.registerAppAttempt();
+ am0.unregisterAppAttempt();
+ nm1.nodeHeartbeat(attempt0.getAppAttemptId(), 1, ContainerState.COMPLETE);
+ am0.waitForState(RMAppAttemptState.FINISHED);
+ rm1.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
+
+ // spot check that app is not saved anymore
+ Assert.assertEquals(0, rmAppState.size());
+
+ // create app that gets launched and does allocate before RM restart
+ RMApp app1 = rm1.submitApp(200);
+ // assert app1 info is saved
+ ApplicationState appState = rmAppState.get(app1.getApplicationId());
+ Assert.assertNotNull(appState);
+ Assert.assertEquals(0, appState.getAttemptCount());
+ Assert.assertEquals(appState.getApplicationSubmissionContext()
+ .getApplicationId(), app1.getApplicationSubmissionContext()
+ .getApplicationId());
+
+ //kick the scheduling to allocate AM container
+ nm1.nodeHeartbeat(true);
+
+ // assert app1 attempt is saved
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
+ rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+ Assert.assertEquals(1, appState.getAttemptCount());
+ ApplicationAttemptState attemptState =
+ appState.getAttempt(attemptId1);
+ Assert.assertNotNull(attemptState);
+ Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
+ attemptState.getMasterContainer().getId());
+
+ // launch the AM
+ MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+
+ // AM request for containers
+ am1.allocate("h1" , 1000, 1, new ArrayList<ContainerId>());
+ // kick the scheduler
+ nm1.nodeHeartbeat(true);
+ List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers();
+ while (conts.size() == 0) {
+ nm1.nodeHeartbeat(true);
+ conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers());
+ Thread.sleep(500);
+ }
+
+ // create app that does not get launched by RM before RM restart
+ RMApp app2 = rm1.submitApp(200);
+
+ // assert app2 info is saved
+ appState = rmAppState.get(app2.getApplicationId());
+ Assert.assertNotNull(appState);
+ Assert.assertEquals(0, appState.getAttemptCount());
+ Assert.assertEquals(appState.getApplicationSubmissionContext()
+ .getApplicationId(), app2.getApplicationSubmissionContext()
+ .getApplicationId());
+
+ // create unmanaged app
+ RMApp appUnmanaged = rm1.submitApp(200, "", "", null, true);
+ ApplicationAttemptId unmanagedAttemptId =
+ appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
+ // assert appUnmanaged info is saved
+ ApplicationId unmanagedAppId = appUnmanaged.getApplicationId();
+ appState = rmAppState.get(unmanagedAppId);
+ Assert.assertNotNull(appState);
+ // wait for attempt to reach LAUNCHED state
+ rm1.waitForState(unmanagedAttemptId, RMAppAttemptState.LAUNCHED);
+ rm1.waitForState(unmanagedAppId, RMAppState.ACCEPTED);
+ // assert unmanaged attempt info is saved
+ Assert.assertEquals(1, appState.getAttemptCount());
+ Assert.assertEquals(appState.getApplicationSubmissionContext()
+ .getApplicationId(), appUnmanaged.getApplicationSubmissionContext()
+ .getApplicationId());
+
+
+ // PHASE 2: create new RM and start from old state
+
+ // create new RM to represent restart and recover state
+ MockRM rm2 = new MockRM(conf, memStore);
+
+ // start new RM
+ rm2.start();
+
+ // change NM to point to new RM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm2.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ // verify load of old state
+ // only 2 apps are loaded since unmanaged app is not loaded back since it
+ // cannot be restarted by the RM this will change with work preserving RM
+ // restart in which AMs/NMs are not rebooted
+ Assert.assertEquals(2, rm2.getRMContext().getRMApps().size());
+
+ // verify correct number of attempts and other data
+ RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+ Assert.assertNotNull(loadedApp1);
+ //Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
+ Assert.assertEquals(app1.getApplicationSubmissionContext()
+ .getApplicationId(), loadedApp1.getApplicationSubmissionContext()
+ .getApplicationId());
+
+ RMApp loadedApp2 = rm2.getRMContext().getRMApps().get(app2.getApplicationId());
+ Assert.assertNotNull(loadedApp2);
+ //Assert.assertEquals(0, loadedApp2.getAppAttempts().size());
+ Assert.assertEquals(app2.getApplicationSubmissionContext()
+ .getApplicationId(), loadedApp2.getApplicationSubmissionContext()
+ .getApplicationId());
+
+ // verify state machine kicked into expected states
+ rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+ rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
+
+ // verify new attempts created
+ Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
+ Assert.assertEquals(1, loadedApp2.getAppAttempts().size());
+
+ // verify old AM is not accepted
+ // change running AM to talk to new RM
+ am1.setAMRMProtocol(rm2.getApplicationMasterService());
+ AMResponse amResponse = am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>());
+ Assert.assertTrue(amResponse.getReboot());
+
+ // NM should be rebooted on heartbeat, even first heartbeat for nm2
+ HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
+ hbResponse = nm2.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
+
+ // new NM to represent NM re-register
+ nm1 = rm2.registerNode("h1:1234", 15120);
+ nm2 = rm2.registerNode("h2:5678", 15120);
+
+ // verify no more reboot response sent
+ hbResponse = nm1.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
+ hbResponse = nm2.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
+
+ // assert app1 attempt is saved
+ attempt1 = loadedApp1.getCurrentAppAttempt();
+ attemptId1 = attempt1.getAppAttemptId();
+ rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+ appState = rmAppState.get(loadedApp1.getApplicationId());
+ attemptState = appState.getAttempt(attemptId1);
+ Assert.assertNotNull(attemptState);
+ Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
+ attemptState.getMasterContainer().getId());
+
+ // Nodes on which the AM's run
+ MockNM am1Node = nm1;
+ if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){
+ am1Node = nm2;
+ }
+
+ // assert app2 attempt is saved
+ RMAppAttempt attempt2 = loadedApp2.getCurrentAppAttempt();
+ ApplicationAttemptId attemptId2 = attempt2.getAppAttemptId();
+ rm2.waitForState(attemptId2, RMAppAttemptState.ALLOCATED);
+ appState = rmAppState.get(loadedApp2.getApplicationId());
+ attemptState = appState.getAttempt(attemptId2);
+ Assert.assertNotNull(attemptState);
+ Assert.assertEquals(BuilderUtils.newContainerId(attemptId2, 1),
+ attemptState.getMasterContainer().getId());
+
+ MockNM am2Node = nm1;
+ if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){
+ am2Node = nm2;
+ }
+
+ // start the AM's
+ am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+
+ MockAM am2 = rm2.sendAMLaunched(attempt2.getAppAttemptId());
+ am2.registerAppAttempt();
+
+ //request for containers
+ am1.allocate("h1" , 1000, 3, new ArrayList<ContainerId>());
+ am2.allocate("h2" , 1000, 1, new ArrayList<ContainerId>());
+
+ // verify container allocate continues to work
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ conts = am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers();
+ while (conts.size() == 0) {
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers());
+ Thread.sleep(500);
+ }
+
+ // finish the AM's
+ am1.unregisterAppAttempt();
+ am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE);
+ am1.waitForState(RMAppAttemptState.FINISHED);
+
+ am2.unregisterAppAttempt();
+ am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE);
+ am2.waitForState(RMAppAttemptState.FINISHED);
+
+ // stop RM's
+ rm2.stop();
+ rm1.stop();
+
+ // completed apps should be removed
+ Assert.assertEquals(0, rmAppState.size());
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Wed Dec 19 04:21:18 2012
@@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.junit.After;
@@ -47,8 +45,7 @@ public class TestResourceManager {
@Before
public void setUp() throws Exception {
Configuration conf = new YarnConfiguration();
- RMStateStore store = StoreFactory.getStore(conf);
- resourceManager = new ResourceManager(store);
+ resourceManager = new ResourceManager();
resourceManager.init(conf);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Wed Dec 19 04:21:18 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -153,6 +154,11 @@ public abstract class MockAsm extends Mo
}
@Override
+ public ApplicationSubmissionContext getApplicationSubmissionContext() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
public String getName() {
throw new UnsupportedOperationException("Not supported yet.");
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java Wed Dec 19 04:21:18 2012
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.After;
import org.junit.Before;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Wed Dec 19 04:21:18 2012
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Wed Dec 19 04:21:18 2012
@@ -24,9 +24,11 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -67,6 +69,11 @@ public class MockRMApp implements RMApp
public ApplicationId getApplicationId() {
return id;
}
+
+ @Override
+ public ApplicationSubmissionContext getApplicationSubmissionContext() {
+ return new ApplicationSubmissionContextPBImpl();
+ }
@Override
public RMAppState getState() {
@@ -118,7 +125,9 @@ public class MockRMApp implements RMApp
public Map<ApplicationAttemptId, RMAppAttempt> getAppAttempts() {
Map<ApplicationAttemptId, RMAppAttempt> attempts =
new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
- attempts.put(attempt.getAppAttemptId(), attempt);
+ if(attempt != null) {
+ attempts.put(attempt.getAppAttemptId(), attempt);
+ }
return attempts;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Wed Dec 19 04:21:18 2012
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -167,6 +169,9 @@ public class TestRMAppAttemptTransitions
new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM());
+ RMStateStore store = mock(RMStateStore.class);
+ ((RMContextImpl) rmContext).setStateStore(store);
+
scheduler = mock(YarnScheduler.class);
masterService = mock(ApplicationMasterService.class);
applicationMasterLauncher = mock(ApplicationMasterLauncher.class);
@@ -295,6 +300,14 @@ public class TestRMAppAttemptTransitions
assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus());
}
+
+ /**
+ * {@link RMAppAttemptState#RECOVERED}
+ */
+ private void testAppAttemptRecoveredState() {
+ assertEquals(RMAppAttemptState.RECOVERED,
+ applicationAttempt.getAppAttemptState());
+ }
/**
* {@link RMAppAttemptState#SCHEDULED}
@@ -438,6 +451,15 @@ public class TestRMAppAttemptTransitions
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.APP_ACCEPTED));
+
+ if(unmanagedAM){
+ assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+ applicationAttempt.getAppAttemptState());
+ applicationAttempt.handle(
+ new RMAppAttemptStoredEvent(
+ applicationAttempt.getAppAttemptId(), null));
+ }
+
testAppAttemptScheduledState();
}
@@ -463,6 +485,12 @@ public class TestRMAppAttemptTransitions
applicationAttempt.getAppAttemptId(),
container));
+ assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
+ applicationAttempt.getAppAttemptState());
+ applicationAttempt.handle(
+ new RMAppAttemptStoredEvent(
+ applicationAttempt.getAppAttemptId(), null));
+
testAppAttemptAllocatedState(container);
return container;
@@ -555,6 +583,15 @@ public class TestRMAppAttemptTransitions
}
@Test
+ public void testNewToRecovered() {
+ applicationAttempt.handle(
+ new RMAppAttemptEvent(
+ applicationAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
+ testAppAttemptRecoveredState();
+ }
+
+ @Test
public void testSubmittedToFailed() {
submitApplicationAttempt();
String message = "Rejected";
@@ -604,7 +641,7 @@ public class TestRMAppAttemptTransitions
diagnostics));
testAppAttemptFailedState(amContainer, diagnostics);
}
-
+
@Test
public void testRunningToFailed() {
Container amContainer = allocateApplicationAttempt();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Wed Dec 19 04:21:18 2012
@@ -27,7 +27,6 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -40,8 +39,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -76,8 +73,7 @@ public class TestCapacityScheduler {
@Before
public void setUp() throws Exception {
- RMStateStore store = StoreFactory.getStore(new Configuration());
- resourceManager = new ResourceManager(store);
+ resourceManager = new ResourceManager();
CapacitySchedulerConfiguration csConf
= new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Wed Dec 19 04:21:18 2012
@@ -29,8 +29,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Before;
@@ -47,8 +45,7 @@ public class TestFSLeafQueue {
Configuration conf = createConfiguration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
- RMStateStore store = StoreFactory.getStore(conf);
- ResourceManager resourceManager = new ResourceManager(store);
+ ResourceManager resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Wed Dec 19 04:21:18 2012
@@ -51,8 +51,6 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -102,8 +100,7 @@ public class TestFairScheduler {
Configuration conf = createConfiguration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
- RMStateStore store = StoreFactory.getStore(conf);
- resourceManager = new ResourceManager(store);
+ resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java Wed Dec 19 04:21:18 2012
@@ -27,8 +27,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.After;
import org.junit.Before;
@@ -50,8 +48,7 @@ public class TestFairSchedulerEventLog {
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
- RMStateStore store = StoreFactory.getStore(conf);
- resourceManager = new ResourceManager(store);
+ resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Wed Dec 19 04:21:18 2012
@@ -38,8 +38,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -59,8 +57,7 @@ public class TestFifoScheduler {
@Before
public void setUp() throws Exception {
- RMStateStore store = StoreFactory.getStore(new Configuration());
- resourceManager = new ResourceManager(store);
+ resourceManager = new ResourceManager();
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Wed Dec 19 04:21:18 2012
@@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
@@ -154,8 +152,7 @@ public class MiniYARNCluster extends Com
getConfig().set(YarnConfiguration.RM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
}
- RMStateStore store = StoreFactory.getStore(getConfig());
- resourceManager = new ResourceManager(store) {
+ resourceManager = new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java Wed Dec 19 04:21:18 2012
@@ -47,7 +47,7 @@ public class TestRMNMSecretKeys {
// intervene
final DrainDispatcher dispatcher = new DrainDispatcher();
- ResourceManager rm = new ResourceManager(null) {
+ ResourceManager rm = new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
// Do nothing.