You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by zj...@apache.org on 2014/08/19 22:40:37 UTC

svn commit: r1618973 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ hadoop-yarn/hadoop-yarn-server/h...

Author: zjshen
Date: Tue Aug 19 20:40:37 2014
New Revision: 1618973

URL: http://svn.apache.org/r1618973
Log:
YARN-2249. Avoided AM release requests being lost on work preserving RM restart. Contributed by Jian He.
svn merge --ignore-ancestry -c 1618972 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Aug 19 20:40:37 2014
@@ -196,6 +196,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2409. RM ActiveToStandBy transition missing stoping previous rmDispatcher.
     (Rohith via jianhe)
 
+    YARN-2249. Avoided AM release requests being lost on work preserving RM
+    restart. (Jian He via zjshen)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Tue Aug 19 20:40:37 2014
@@ -23,10 +23,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -34,18 +38,25 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -54,6 +65,7 @@ import org.apache.hadoop.yarn.util.resou
 
 import com.google.common.util.concurrent.SettableFuture;
 
+
 @SuppressWarnings("unchecked")
 public abstract class AbstractYarnScheduler
     <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
@@ -72,6 +84,7 @@ public abstract class AbstractYarnSchedu
 
   protected RMContext rmContext;
   protected Map<ApplicationId, SchedulerApplication<T>> applications;
+  protected int nmExpireInterval;
 
   protected final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
@@ -87,6 +100,15 @@ public abstract class AbstractYarnSchedu
     super(name);
   }
 
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    nmExpireInterval =
+        conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+    createReleaseCache();
+    super.serviceInit(conf);
+  }
+
   public synchronized List<Container> getTransferredContainers(
       ApplicationAttemptId currentAttempt) {
     ApplicationId appId = currentAttempt.getApplicationId();
@@ -281,6 +303,19 @@ public abstract class AbstractYarnSchedu
           ((RMContainerImpl)rmContainer).setAMContainer(true);
         }
       }
+
+      synchronized (schedulerAttempt) {
+        Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
+        if (releases.contains(container.getContainerId())) {
+          // release the container
+          rmContainer.handle(new RMContainerFinishedEvent(container
+            .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
+            container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
+            RMContainerEventType.RELEASED));
+          releases.remove(container.getContainerId());
+          LOG.info(container.getContainerId() + " is released by application.");
+        }
+      }
     }
   }
 
@@ -320,6 +355,62 @@ public abstract class AbstractYarnSchedu
     }
   }
 
+  protected void createReleaseCache() {
+    // Cleanup the cache after nm expire interval.
+    new Timer().schedule(new TimerTask() {
+      @Override
+      public void run() {
+        for (SchedulerApplication<T> app : applications.values()) {
+
+          T attempt = app.getCurrentAppAttempt();
+          synchronized (attempt) {
+            for (ContainerId containerId : attempt.getPendingRelease()) {
+              RMAuditLogger.logFailure(
+                app.getUser(),
+                AuditConstants.RELEASE_CONTAINER,
+                "Unauthorized access or invalid container",
+                "Scheduler",
+                "Trying to release container not owned by app or with invalid id.",
+                attempt.getApplicationId(), containerId);
+            }
+            attempt.getPendingRelease().clear();
+          }
+        }
+        LOG.info("Release request cache is cleaned up");
+      }
+    }, nmExpireInterval);
+  }
+
+  // clean up a completed container
+  protected abstract void completedContainer(RMContainer rmContainer,
+      ContainerStatus containerStatus, RMContainerEventType event);
+
+  protected void releaseContainers(List<ContainerId> containers,
+      SchedulerApplicationAttempt attempt) {
+    for (ContainerId containerId : containers) {
+      RMContainer rmContainer = getRMContainer(containerId);
+      if (rmContainer == null) {
+        if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+            < nmExpireInterval) {
+          LOG.info(containerId + " doesn't exist. Add the container"
+              + " to the release request cache as it maybe on recovery.");
+          synchronized (attempt) {
+            attempt.getPendingRelease().add(containerId);
+          }
+        } else {
+          RMAuditLogger.logFailure(attempt.getUser(),
+            AuditConstants.RELEASE_CONTAINER,
+            "Unauthorized access or invalid container", "Scheduler",
+            "Trying to release container not owned by app or with invalid id.",
+            attempt.getApplicationId(), containerId);
+        }
+      }
+      completedContainer(rmContainer,
+        SchedulerUtils.createAbnormalContainerStatus(containerId,
+          SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
+    }
+  }
+
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Tue Aug 19 20:40:37 2014
@@ -17,13 +17,14 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 
@@ -87,6 +88,13 @@ public class SchedulerApplicationAttempt
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
 
+  // This pendingRelease is used in work-preserving recovery scenario to keep
+  // track of the AM's outstanding release requests. RM on recovery could
+  // receive the release request form AM before it receives the container status
+  // from NM for recovery. In this case, the to-be-recovered containers reported
+  // by NM should not be recovered.
+  private Set<ContainerId> pendingRelease = null;
+
   /**
    * Count how many times the application has been given an opportunity
    * to schedule a task at each priority. Each time the scheduler
@@ -114,7 +122,7 @@ public class SchedulerApplicationAttempt
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
             activeUsersManager, rmContext.getEpoch());
     this.queue = queue;
-    
+    this.pendingRelease = new HashSet<ContainerId>();
     if (rmContext.getRMApps() != null &&
         rmContext.getRMApps()
             .containsKey(applicationAttemptId.getApplicationId())) {
@@ -163,6 +171,10 @@ public class SchedulerApplicationAttempt
     return appSchedulingInfo.getResourceRequests(priority);
   }
 
+  public Set<ContainerId> getPendingRelease() {
+    return this.pendingRelease;
+  }
+
   public int getNewContainerId() {
     return appSchedulingInfo.getNewContainerId();
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Aug 19 20:40:37 2014
@@ -54,8 +54,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
@@ -199,7 +197,7 @@ public class CapacityScheduler extends
   private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
   
   private boolean overrideWithQueueMappings = false;
-  private List<QueueMapping> mappings = new ArrayList<QueueMapping>();
+  private List<QueueMapping> mappings = null;
   private Groups groups;
 
   @VisibleForTesting
@@ -789,21 +787,7 @@ public class CapacityScheduler extends
         getMinimumResourceCapability(), maximumAllocation);
 
     // Release containers
-    for (ContainerId releasedContainerId : release) {
-      RMContainer rmContainer = getRMContainer(releasedContainerId);
-      if (rmContainer == null) {
-         RMAuditLogger.logFailure(application.getUser(),
-             AuditConstants.RELEASE_CONTAINER, 
-             "Unauthorized access or invalid container", "CapacityScheduler",
-             "Trying to release container not owned by app or with invalid id",
-             application.getApplicationId(), releasedContainerId);
-      }
-      completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              releasedContainerId, 
-              SchedulerUtils.RELEASED_CONTAINER),
-          RMContainerEventType.RELEASED);
-    }
+    releaseContainers(release, application);
 
     synchronized (application) {
 
@@ -1098,7 +1082,8 @@ public class CapacityScheduler extends
   }
   
   @Lock(CapacityScheduler.class)
-  private synchronized void completedContainer(RMContainer rmContainer,
+  @Override
+  protected synchronized void completedContainer(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
     if (rmContainer == null) {
       LOG.info("Null container completed...");

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Aug 19 20:40:37 2014
@@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
@@ -810,7 +808,8 @@ public class FairScheduler extends
   /**
    * Clean up a completed container.
    */
-  private synchronized void completedContainer(RMContainer rmContainer,
+  @Override
+  protected synchronized void completedContainer(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
     if (rmContainer == null) {
       LOG.info("Null container completed...");
@@ -913,21 +912,7 @@ public class FairScheduler extends
     }
 
     // Release containers
-    for (ContainerId releasedContainerId : release) {
-      RMContainer rmContainer = getRMContainer(releasedContainerId);
-      if (rmContainer == null) {
-        RMAuditLogger.logFailure(application.getUser(),
-            AuditConstants.RELEASE_CONTAINER,
-            "Unauthorized access or invalid container", "FairScheduler",
-            "Trying to release container not owned by app or with invalid id",
-            application.getApplicationId(), releasedContainerId);
-      }
-      completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              releasedContainerId,
-              SchedulerUtils.RELEASED_CONTAINER),
-          RMContainerEventType.RELEASED);
-    }
+    releaseContainers(release, application);
 
     synchronized (application) {
       if (!ask.isEmpty()) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Aug 19 20:40:37 2014
@@ -52,8 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -295,21 +292,7 @@ public class FifoScheduler extends
         clusterResource, minimumAllocation, maximumAllocation);
 
     // Release containers
-    for (ContainerId releasedContainer : release) {
-      RMContainer rmContainer = getRMContainer(releasedContainer);
-      if (rmContainer == null) {
-         RMAuditLogger.logFailure(application.getUser(),
-             AuditConstants.RELEASE_CONTAINER, 
-             "Unauthorized access or invalid container", "FifoScheduler", 
-             "Trying to release container not owned by app or with invalid id",
-             application.getApplicationId(), releasedContainer);
-      }
-      containerCompleted(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              releasedContainer, 
-              SchedulerUtils.RELEASED_CONTAINER),
-          RMContainerEventType.RELEASED);
-    }
+    releaseContainers(release, application);
 
     synchronized (application) {
 
@@ -443,7 +426,7 @@ public class FifoScheduler extends
         LOG.info("Skip killing " + container.getContainerId());
         continue;
       }
-      containerCompleted(container,
+      completedContainer(container,
         SchedulerUtils.createAbnormalContainerStatus(
           container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
         RMContainerEventType.KILL);
@@ -717,7 +700,7 @@ public class FifoScheduler extends
     for (ContainerStatus completedContainer : completedContainers) {
       ContainerId containerId = completedContainer.getContainerId();
       LOG.debug("Container FINISHED: " + containerId);
-      containerCompleted(getRMContainer(containerId), 
+      completedContainer(getRMContainer(containerId), 
           completedContainer, RMContainerEventType.FINISHED);
     }
 
@@ -818,7 +801,7 @@ public class FifoScheduler extends
       ContainerExpiredSchedulerEvent containerExpiredEvent = 
           (ContainerExpiredSchedulerEvent) event;
       ContainerId containerid = containerExpiredEvent.getContainerId();
-      containerCompleted(getRMContainer(containerid), 
+      completedContainer(getRMContainer(containerid), 
           SchedulerUtils.createAbnormalContainerStatus(
               containerid, 
               SchedulerUtils.EXPIRED_CONTAINER),
@@ -831,7 +814,8 @@ public class FifoScheduler extends
   }
 
   @Lock(FifoScheduler.class)
-  private synchronized void containerCompleted(RMContainer rmContainer,
+  @Override
+  protected synchronized void completedContainer(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
     if (rmContainer == null) {
       LOG.info("Null container completed...");
@@ -881,7 +865,7 @@ public class FifoScheduler extends
     }
     // Kill running containers
     for(RMContainer container : node.getRunningContainers()) {
-      containerCompleted(container, 
+      completedContainer(container, 
           SchedulerUtils.createAbnormalContainerStatus(
               container.getContainerId(), 
               SchedulerUtils.LOST_CONTAINER),

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Tue Aug 19 20:40:37 2014
@@ -49,7 +49,7 @@ public class MockAM {
 
   private volatile int responseId = 0;
   private final ApplicationAttemptId attemptId;
-  private final RMContext context;
+  private RMContext context;
   private ApplicationMasterProtocol amRMProtocol;
 
   private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
@@ -61,8 +61,10 @@ public class MockAM {
     this.amRMProtocol = amRMProtocol;
     this.attemptId = attemptId;
   }
-  
-  void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) {
+
+  public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol,
+      RMContext context) {
+    this.context = context;
     this.amRMProtocol = amRMProtocol;
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Tue Aug 19 20:40:37 2014
@@ -171,7 +171,6 @@ public class TestApplicationMasterServic
     RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
     MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
     am1.registerAppAttempt();
-    am1.setAMRMProtocol(rm.getApplicationMasterService());
 
     AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
     List<ContainerId> release = new ArrayList<ContainerId>();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Aug 19 20:40:37 2014
@@ -289,7 +289,7 @@ public class TestRMRestart {
     
     // verify old AM is not accepted
     // change running AM to talk to new RM
-    am1.setAMRMProtocol(rm2.getApplicationMasterService());
+    am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
     AllocateResponse allocResponse = am1.allocate(
         new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
@@ -1663,7 +1663,7 @@ public class TestRMRestart {
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     // recover app
     RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
-    am1.setAMRMProtocol(rm2.getApplicationMasterService());
+    am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
     am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
     nm1.nodeHeartbeat(true);
     nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1618973&r1=1618972&r2=1618973&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Tue Aug 19 20:40:37 2014
@@ -33,10 +33,13 @@ import java.util.Set;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -72,6 +75,9 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import com.google.common.base.Supplier;
+
+
 @SuppressWarnings({"rawtypes", "unchecked"})
 @RunWith(value = Parameterized.class)
 public class TestWorkPreservingRMRestart {
@@ -572,8 +578,8 @@ public class TestWorkPreservingRMRestart
     rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
     rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
 
-    am0.setAMRMProtocol(rm2.getApplicationMasterService());
-    am0.registerAppAttempt(false);
+    am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+    am0.registerAppAttempt(true);
 
     rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
     rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
@@ -646,6 +652,69 @@ public class TestWorkPreservingRMRestart
     waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
   }
 
+  // Test if RM on recovery receives the container release request from AM
+  // before it receives the container status reported by NM for recovery. this
+  // container should not be recovered.
+  @Test (timeout = 30000)
+  public void testReleasedContainerNotRecovered() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    rm1.start();
+
+    RMApp app1 = rm1.submitApp(1024);
+    final MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // Re-start RM
+    conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000);
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+    am1.registerAppAttempt(true);
+
+    // try to release a container before the container is actually recovered.
+    final ContainerId runningContainer =
+        ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+    am1.allocate(null, Arrays.asList(runningContainer));
+
+    // send container statuses to recover the containers
+    List<NMContainerStatus> containerStatuses =
+        createNMContainerStatusForApp(am1);
+    nm1.registerNode(containerStatuses, null);
+
+    // only the am container should be recovered.
+    waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId());
+
+    final AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm2.getResourceScheduler();
+    // cached release request is cleaned.
+    // assertFalse(scheduler.getPendingRelease().contains(runningContainer));
+
+    AllocateResponse response = am1.allocate(null, null);
+    // AM gets notified of the completed container.
+    boolean receivedCompletedContainer = false;
+    for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+      if (status.getContainerId().equals(runningContainer)) {
+        receivedCompletedContainer = true;
+      }
+    }
+    assertTrue(receivedCompletedContainer);
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        // release cache is cleaned up and previous running container is not
+        // recovered
+        return scheduler.getApplicationAttempt(am1.getApplicationAttemptId())
+          .getPendingRelease().isEmpty()
+            && scheduler.getRMContainer(runningContainer) == null;
+      }
+    }, 1000, 20000);
+  }
+
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,
       int allocatedContainers, int availableMB, int availableVirtualCores,
@@ -661,7 +730,7 @@ public class TestWorkPreservingRMRestart
     assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
   }
 
-  private void waitForNumContainersToRecover(int num, MockRM rm,
+  public static void waitForNumContainersToRecover(int num, MockRM rm,
       ApplicationAttemptId attemptId) throws Exception {
     AbstractYarnScheduler scheduler =
         (AbstractYarnScheduler) rm.getResourceScheduler();
@@ -674,7 +743,9 @@ public class TestWorkPreservingRMRestart
       attempt = scheduler.getApplicationAttempt(attemptId);
     }
     while (attempt.getLiveContainers().size() < num) {
-      System.out.println("Wait for " + num + " containers to recover.");
+      System.out.println("Wait for " + num
+          + " containers to recover. currently: "
+          + attempt.getLiveContainers().size());
       Thread.sleep(200);
     }
   }