You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by zj...@apache.org on 2014/08/19 22:33:50 UTC
svn commit: r1618972 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-s...
Author: zjshen
Date: Tue Aug 19 20:33:49 2014
New Revision: 1618972
URL: http://svn.apache.org/r1618972
Log:
YARN-2249. Avoided AM release requests being lost on work preserving RM restart. Contributed by Jian He.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Aug 19 20:33:49 2014
@@ -214,6 +214,9 @@ Release 2.6.0 - UNRELEASED
YARN-2409. RM ActiveToStandBy transition missing stoping previous rmDispatcher.
(Rohith via jianhe)
+ YARN-2249. Avoided AM release requests being lost on work preserving RM
+ restart. (Jian He via zjshen)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Tue Aug 19 20:33:49 2014
@@ -23,10 +23,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -34,18 +38,25 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -54,6 +65,7 @@ import org.apache.hadoop.yarn.util.resou
import com.google.common.util.concurrent.SettableFuture;
+
@SuppressWarnings("unchecked")
public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
@@ -72,6 +84,7 @@ public abstract class AbstractYarnSchedu
protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication<T>> applications;
+ protected int nmExpireInterval;
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
@@ -87,6 +100,15 @@ public abstract class AbstractYarnSchedu
super(name);
}
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ nmExpireInterval =
+ conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ createReleaseCache();
+ super.serviceInit(conf);
+ }
+
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
@@ -281,6 +303,19 @@ public abstract class AbstractYarnSchedu
((RMContainerImpl)rmContainer).setAMContainer(true);
}
}
+
+ synchronized (schedulerAttempt) {
+ Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
+ if (releases.contains(container.getContainerId())) {
+ // release the container
+ rmContainer.handle(new RMContainerFinishedEvent(container
+ .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
+ container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
+ RMContainerEventType.RELEASED));
+ releases.remove(container.getContainerId());
+ LOG.info(container.getContainerId() + " is released by application.");
+ }
+ }
}
}
@@ -320,6 +355,62 @@ public abstract class AbstractYarnSchedu
}
}
+ protected void createReleaseCache() {
+ // Cleanup the cache after nm expire interval.
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ for (SchedulerApplication<T> app : applications.values()) {
+
+ T attempt = app.getCurrentAppAttempt();
+ synchronized (attempt) {
+ for (ContainerId containerId : attempt.getPendingRelease()) {
+ RMAuditLogger.logFailure(
+ app.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container",
+ "Scheduler",
+ "Trying to release container not owned by app or with invalid id.",
+ attempt.getApplicationId(), containerId);
+ }
+ attempt.getPendingRelease().clear();
+ }
+ }
+ LOG.info("Release request cache is cleaned up");
+ }
+ }, nmExpireInterval);
+ }
+
+ // clean up a completed container
+ protected abstract void completedContainer(RMContainer rmContainer,
+ ContainerStatus containerStatus, RMContainerEventType event);
+
+ protected void releaseContainers(List<ContainerId> containers,
+ SchedulerApplicationAttempt attempt) {
+ for (ContainerId containerId : containers) {
+ RMContainer rmContainer = getRMContainer(containerId);
+ if (rmContainer == null) {
+ if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+ < nmExpireInterval) {
+ LOG.info(containerId + " doesn't exist. Add the container"
+ + " to the release request cache as it maybe on recovery.");
+ synchronized (attempt) {
+ attempt.getPendingRelease().add(containerId);
+ }
+ } else {
+ RMAuditLogger.logFailure(attempt.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container", "Scheduler",
+ "Trying to release container not owned by app or with invalid id.",
+ attempt.getApplicationId(), containerId);
+ }
+ }
+ completedContainer(rmContainer,
+ SchedulerUtils.createAbnormalContainerStatus(containerId,
+ SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
+ }
+ }
+
public SchedulerNode getSchedulerNode(NodeId nodeId) {
return nodes.get(nodeId);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Tue Aug 19 20:33:49 2014
@@ -17,13 +17,14 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
@@ -87,6 +88,13 @@ public class SchedulerApplicationAttempt
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
+ // This pendingRelease is used in work-preserving recovery scenario to keep
+ // track of the AM's outstanding release requests. RM on recovery could
+ // receive the release request form AM before it receives the container status
+ // from NM for recovery. In this case, the to-be-recovered containers reported
+ // by NM should not be recovered.
+ private Set<ContainerId> pendingRelease = null;
+
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
@@ -114,7 +122,7 @@ public class SchedulerApplicationAttempt
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, rmContext.getEpoch());
this.queue = queue;
-
+ this.pendingRelease = new HashSet<ContainerId>();
if (rmContext.getRMApps() != null &&
rmContext.getRMApps()
.containsKey(applicationAttemptId.getApplicationId())) {
@@ -163,6 +171,10 @@ public class SchedulerApplicationAttempt
return appSchedulingInfo.getResourceRequests(priority);
}
+ public Set<ContainerId> getPendingRelease() {
+ return this.pendingRelease;
+ }
+
public int getNewContainerId() {
return appSchedulingInfo.getNewContainerId();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Aug 19 20:33:49 2014
@@ -54,8 +54,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
@@ -199,7 +197,7 @@ public class CapacityScheduler extends
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private boolean overrideWithQueueMappings = false;
- private List<QueueMapping> mappings = new ArrayList<QueueMapping>();
+ private List<QueueMapping> mappings = null;
private Groups groups;
@VisibleForTesting
@@ -789,21 +787,7 @@ public class CapacityScheduler extends
getMinimumResourceCapability(), maximumAllocation);
// Release containers
- for (ContainerId releasedContainerId : release) {
- RMContainer rmContainer = getRMContainer(releasedContainerId);
- if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "CapacityScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainerId);
- }
- completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- releasedContainerId,
- SchedulerUtils.RELEASED_CONTAINER),
- RMContainerEventType.RELEASED);
- }
+ releaseContainers(release, application);
synchronized (application) {
@@ -1098,7 +1082,8 @@ public class CapacityScheduler extends
}
@Lock(CapacityScheduler.class)
- private synchronized void completedContainer(RMContainer rmContainer,
+ @Override
+ protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Aug 19 20:33:49 2014
@@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
@@ -810,7 +808,8 @@ public class FairScheduler extends
/**
* Clean up a completed container.
*/
- private synchronized void completedContainer(RMContainer rmContainer,
+ @Override
+ protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
@@ -913,21 +912,7 @@ public class FairScheduler extends
}
// Release containers
- for (ContainerId releasedContainerId : release) {
- RMContainer rmContainer = getRMContainer(releasedContainerId);
- if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "FairScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainerId);
- }
- completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- releasedContainerId,
- SchedulerUtils.RELEASED_CONTAINER),
- RMContainerEventType.RELEASED);
- }
+ releaseContainers(release, application);
synchronized (application) {
if (!ask.isEmpty()) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Aug 19 20:33:49 2014
@@ -52,8 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -295,21 +292,7 @@ public class FifoScheduler extends
clusterResource, minimumAllocation, maximumAllocation);
// Release containers
- for (ContainerId releasedContainer : release) {
- RMContainer rmContainer = getRMContainer(releasedContainer);
- if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "FifoScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainer);
- }
- containerCompleted(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- releasedContainer,
- SchedulerUtils.RELEASED_CONTAINER),
- RMContainerEventType.RELEASED);
- }
+ releaseContainers(release, application);
synchronized (application) {
@@ -443,7 +426,7 @@ public class FifoScheduler extends
LOG.info("Skip killing " + container.getContainerId());
continue;
}
- containerCompleted(container,
+ completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
@@ -717,7 +700,7 @@ public class FifoScheduler extends
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
- containerCompleted(getRMContainer(containerId),
+ completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
@@ -818,7 +801,7 @@ public class FifoScheduler extends
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
ContainerId containerid = containerExpiredEvent.getContainerId();
- containerCompleted(getRMContainer(containerid),
+ completedContainer(getRMContainer(containerid),
SchedulerUtils.createAbnormalContainerStatus(
containerid,
SchedulerUtils.EXPIRED_CONTAINER),
@@ -831,7 +814,8 @@ public class FifoScheduler extends
}
@Lock(FifoScheduler.class)
- private synchronized void containerCompleted(RMContainer rmContainer,
+ @Override
+ protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
@@ -881,7 +865,7 @@ public class FifoScheduler extends
}
// Kill running containers
for(RMContainer container : node.getRunningContainers()) {
- containerCompleted(container,
+ completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Tue Aug 19 20:33:49 2014
@@ -49,7 +49,7 @@ public class MockAM {
private volatile int responseId = 0;
private final ApplicationAttemptId attemptId;
- private final RMContext context;
+ private RMContext context;
private ApplicationMasterProtocol amRMProtocol;
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
@@ -61,8 +61,10 @@ public class MockAM {
this.amRMProtocol = amRMProtocol;
this.attemptId = attemptId;
}
-
- void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) {
+
+ public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol,
+ RMContext context) {
+ this.context = context;
this.amRMProtocol = amRMProtocol;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Tue Aug 19 20:33:49 2014
@@ -171,7 +171,6 @@ public class TestApplicationMasterServic
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
- am1.setAMRMProtocol(rm.getApplicationMasterService());
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
List<ContainerId> release = new ArrayList<ContainerId>();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Aug 19 20:33:49 2014
@@ -289,7 +289,7 @@ public class TestRMRestart {
// verify old AM is not accepted
// change running AM to talk to new RM
- am1.setAMRMProtocol(rm2.getApplicationMasterService());
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
@@ -1663,7 +1663,7 @@ public class TestRMRestart {
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
- am1.setAMRMProtocol(rm2.getApplicationMasterService());
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1618972&r1=1618971&r2=1618972&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Tue Aug 19 20:33:49 2014
@@ -33,10 +33,13 @@ import java.util.Set;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -72,6 +75,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import com.google.common.base.Supplier;
+
+
@SuppressWarnings({"rawtypes", "unchecked"})
@RunWith(value = Parameterized.class)
public class TestWorkPreservingRMRestart {
@@ -572,8 +578,8 @@ public class TestWorkPreservingRMRestart
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
- am0.setAMRMProtocol(rm2.getApplicationMasterService());
- am0.registerAppAttempt(false);
+ am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+ am0.registerAppAttempt(true);
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
@@ -646,6 +652,69 @@ public class TestWorkPreservingRMRestart
waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
}
+ // Test if RM on recovery receives the container release request from AM
+ // before it receives the container status reported by NM for recovery. this
+ // container should not be recovered.
+ @Test (timeout = 30000)
+ public void testReleasedContainerNotRecovered() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ rm1.start();
+
+ RMApp app1 = rm1.submitApp(1024);
+ final MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // Re-start RM
+ conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000);
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+ am1.registerAppAttempt(true);
+
+ // try to release a container before the container is actually recovered.
+ final ContainerId runningContainer =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ am1.allocate(null, Arrays.asList(runningContainer));
+
+ // send container statuses to recover the containers
+ List<NMContainerStatus> containerStatuses =
+ createNMContainerStatusForApp(am1);
+ nm1.registerNode(containerStatuses, null);
+
+ // only the am container should be recovered.
+ waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId());
+
+ final AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm2.getResourceScheduler();
+ // cached release request is cleaned.
+ // assertFalse(scheduler.getPendingRelease().contains(runningContainer));
+
+ AllocateResponse response = am1.allocate(null, null);
+ // AM gets notified of the completed container.
+ boolean receivedCompletedContainer = false;
+ for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+ if (status.getContainerId().equals(runningContainer)) {
+ receivedCompletedContainer = true;
+ }
+ }
+ assertTrue(receivedCompletedContainer);
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ public Boolean get() {
+ // release cache is cleaned up and previous running container is not
+ // recovered
+ return scheduler.getApplicationAttempt(am1.getApplicationAttemptId())
+ .getPendingRelease().isEmpty()
+ && scheduler.getRMContainer(runningContainer) == null;
+ }
+ }, 1000, 20000);
+ }
+
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,
int allocatedContainers, int availableMB, int availableVirtualCores,
@@ -661,7 +730,7 @@ public class TestWorkPreservingRMRestart
assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
}
- private void waitForNumContainersToRecover(int num, MockRM rm,
+ public static void waitForNumContainersToRecover(int num, MockRM rm,
ApplicationAttemptId attemptId) throws Exception {
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm.getResourceScheduler();
@@ -674,7 +743,9 @@ public class TestWorkPreservingRMRestart
attempt = scheduler.getApplicationAttempt(attemptId);
}
while (attempt.getLiveContainers().size() < num) {
- System.out.println("Wait for " + num + " containers to recover.");
+ System.out.println("Wait for " + num
+ + " containers to recover. currently: "
+ + attempt.getLiveContainers().size());
Thread.sleep(200);
}
}