You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cd...@apache.org on 2013/05/09 23:21:42 UTC

svn commit: r1480778 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-reso...

Author: cdouglas
Date: Thu May  9 21:21:42 2013
New Revision: 1480778

URL: http://svn.apache.org/r1480778
Log:
YARN-568. Add support for work preserving preemption to the FairScheduler.
Contributed by Carlo Curino and Sandy Ryza

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu May  9 21:21:42 2013
@@ -10,9 +10,6 @@ Trunk - Unreleased 
     Azure environments. (See breakdown of tasks below for subtasks and
     contributors)
 
-    YARN-45. Add protocol for schedulers to request containers back from
-    ApplicationMasters. (Carlo Curino, cdouglas)
-
   IMPROVEMENTS
 
     YARN-84. Use Builder to build RPC server. (Brandon Li via suresh)
@@ -129,6 +126,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-482. FS: Extend SchedulingMode to intermediate queues. 
     (kkambatl via tucu)
 
+    YARN-45. Add protocol for schedulers to request containers back from
+    ApplicationMasters. (Carlo Curino, cdouglas)
+
   IMPROVEMENTS
 
     YARN-365. Change NM heartbeat handling to not generate a scheduler event
@@ -235,6 +235,9 @@ Release 2.0.5-beta - UNRELEASED
     tokens for app attempt so that RM can be restarted while preserving current
     applications. (Jian He via vinodkv)
 
+    YARN-568. Add support for work preserving preemption to the FairScheduler.
+    (Carlo Curino and Sandy Ryza via cdouglas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Thu May  9 21:21:42 2013
@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -39,6 +41,11 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -341,9 +348,65 @@ public class ApplicationMasterService ex
       }
       
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+   
+      // add preemption to the allocateResponse message (if any)
+      allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
+      
       return allocateResponse;
     }
   }
+  
+  private PreemptionMessage generatePreemptionMessage(Allocation allocation){
+    PreemptionMessage pMsg = null;
+    // assemble strict preemption request
+    if (allocation.getStrictContainerPreemptions() != null) {
+       pMsg =
+        recordFactory.newRecordInstance(PreemptionMessage.class);
+      StrictPreemptionContract pStrict =
+          recordFactory.newRecordInstance(StrictPreemptionContract.class);
+      Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+      for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
+        PreemptionContainer pc =
+            recordFactory.newRecordInstance(PreemptionContainer.class);
+        pc.setId(cId);
+        pCont.add(pc);
+      }
+      pStrict.setContainers(pCont);
+      pMsg.setStrictContract(pStrict);
+    }
+
+    // assemble negotiable preemption request
+    if (allocation.getResourcePreemptions() != null &&
+        allocation.getResourcePreemptions().size() > 0 &&
+        allocation.getContainerPreemptions() != null &&
+        allocation.getContainerPreemptions().size() > 0) {
+      if (pMsg == null) {
+        pMsg =
+            recordFactory.newRecordInstance(PreemptionMessage.class);
+      }
+      PreemptionContract contract =
+          recordFactory.newRecordInstance(PreemptionContract.class);
+      Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+      for (ContainerId cId : allocation.getContainerPreemptions()) {
+        PreemptionContainer pc =
+            recordFactory.newRecordInstance(PreemptionContainer.class);
+        pc.setId(cId);
+        pCont.add(pc);
+      }
+      List<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>();
+      for (ResourceRequest crr : allocation.getResourcePreemptions()) {
+        PreemptionResourceRequest prr =
+            recordFactory.newRecordInstance(PreemptionResourceRequest.class);
+        prr.setResourceRequest(crr);
+        pRes.add(prr);
+      }
+      contract.setContainers(pCont);
+      contract.setResourceRequest(pRes);
+      pMsg.setContract(contract);
+    }
+    
+    return pMsg;
+  }
 
   public void registerAppAttempt(ApplicationAttemptId attemptId) {
     AllocateResponse response =

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java Thu May  9 21:21:42 2013
@@ -19,17 +19,43 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 public class Allocation {
+  
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
   final List<Container> containers;
   final Resource resourceLimit;
-  
+  final Set<ContainerId> strictContainers;
+  final Set<ContainerId> fungibleContainers;
+  final List<ResourceRequest> fungibleResources;
+
   public Allocation(List<Container> containers, Resource resourceLimit) {
+    this(containers, resourceLimit, null, null, null);
+  }
+
+  public Allocation(List<Container> containers, Resource resourceLimit,
+      Set<ContainerId> strictContainers) {
+    this(containers, resourceLimit, strictContainers, null, null);
+  }
+
+  public Allocation(List<Container> containers, Resource resourceLimit,
+      Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
+      List<ResourceRequest> fungibleResources) {
     this.containers = containers;
     this.resourceLimit = resourceLimit;
+    this.strictContainers = strictContainers;
+    this.fungibleContainers = fungibleContainers;
+    this.fungibleResources = fungibleResources;
   }
 
   public List<Container> getContainers() {
@@ -39,5 +65,17 @@ public class Allocation {
   public Resource getResourceLimit() {
     return resourceLimit;
   }
-  
+
+  public Set<ContainerId> getStrictContainerPreemptions() {
+    return strictContainers;
+  }
+
+  public Set<ContainerId> getContainerPreemptions() {
+    return fungibleContainers;
+  }
+
+  public List<ResourceRequest> getResourcePreemptions() {
+    return fungibleResources;
+  }
+    
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu May  9 21:21:42 2013
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -83,7 +84,9 @@ public class FSSchedulerApp extends Sche
 
   final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
       new HashMap<Priority, Map<NodeId, RMContainer>>();
-  
+
+  final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
+
   /**
    * Count how many times the application has been given an opportunity
    * to schedule a task at each priority. Each time the scheduler
@@ -230,6 +233,9 @@ public class FSSchedulerApp extends Sche
     Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
+
+    // remove from preemption map if it is completed
+    preemptionMap.remove(rmContainer);
   }
 
   synchronized public List<Container> pullNewlyAllocatedContainers() {
@@ -572,4 +578,18 @@ public class FSSchedulerApp extends Sche
         " priority " + priority);
     allowedLocalityLevel.put(priority, level);
   }
+
+  // related methods
+  public void addPreemption(RMContainer container, long time) {
+    assert preemptionMap.get(container) == null;
+    preemptionMap.put(container, time);
+  }
+
+  public Long getContainerPreemptionTime(RMContainer container) {
+    return preemptionMap.get(container);
+  }
+
+  public Set<RMContainer> getPreemptionContainers() {
+    return preemptionMap.keySet();
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu May  9 21:21:42 2013
@@ -24,8 +24,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -155,10 +158,16 @@ public class FairScheduler implements Re
   private Resource clusterCapacity = 
       RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
 
-  // How often tasks are preempted (must be longer than a couple
+  // How often tasks are preempted 
+  protected long preemptionInterval; 
+  
+  // ms to wait before force killing stuff (must be longer than a couple
   // of heartbeats to give task-kill commands a chance to act).
-  protected long preemptionInterval = 15000;
-
+  protected long waitTimeBeforeKill; 
+  
+  // Containers whose AMs have been warned that they will be preempted soon.
+  private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
+  
   protected boolean preemptionEnabled;
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
@@ -335,34 +344,78 @@ public class FairScheduler implements Re
     // Sort containers into reverse order of priority
     Collections.sort(runningContainers, new Comparator<RMContainer>() {
       public int compare(RMContainer c1, RMContainer c2) {
-        return c2.getContainer().getPriority().compareTo(
+        int ret = c2.getContainer().getPriority().compareTo(
             c1.getContainer().getPriority());
+        if (ret == 0) {
+          return c2.getContainerId().compareTo(c1.getContainerId());
+        }
+        return ret;
       }
     });
+    
+    // Scan down the list of containers we've already warned and kill them
+    // if we need to.  Remove any containers from the list that we don't need
+    // or that are no longer running.
+    Iterator<RMContainer> warnedIter = warnedContainers.iterator();
+    Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
+    while (warnedIter.hasNext()) {
+      RMContainer container = warnedIter.next();
+      if (container.getState() == RMContainerState.RUNNING &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+              toPreempt, Resources.none())) {
+        warnOrKillContainer(container, apps.get(container), queues.get(container));
+        preemptedThisRound.add(container);
+        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+      } else {
+        warnedIter.remove();
+      }
+    }
 
-    // Scan down the sorted list of task statuses until we've killed enough
-    // tasks, making sure we don't kill too many from any queue
-    for (RMContainer container : runningContainers) {
+    // Scan down the rest of the containers until we've preempted enough, making
+    // sure we don't preempt too many from any queue
+    Iterator<RMContainer> runningIter = runningContainers.iterator();
+    while (runningIter.hasNext() &&
+        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+            toPreempt, Resources.none())) {
+      RMContainer container = runningIter.next();
       FSLeafQueue sched = queues.get(container);
-      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-          sched.getResourceUsage(), sched.getFairShare())) {
-        LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
-            "res=" + container.getContainer().getResource() +
-            ") from queue " + sched.getName());
-        ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
+      if (!preemptedThisRound.contains(container) &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+              sched.getResourceUsage(), sched.getFairShare())) {
+        warnOrKillContainer(container, apps.get(container), sched);
+        
+        warnedContainers.add(container);
+        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+      }
+    }
+  }
+  
+  private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
+      FSLeafQueue queue) {
+    LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
+        "res=" + container.getContainer().getResource() +
+        ") from queue " + queue.getName());
+    
+    Long time = app.getContainerPreemptionTime(container);
+
+    if (time != null) {
+      // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
+      // proceed with kill
+      if (time + waitTimeBeforeKill < clock.getTime()) {
+        ContainerStatus status =
+          SchedulerUtils.createAbnormalContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
         completedContainer(container, status, RMContainerEventType.KILL);
-
-        toPreempt = Resources.subtract(toPreempt,
-            container.getContainer().getResource());
-        if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
-            toPreempt, Resources.none())) {
-          break;
-        }
+        LOG.info("Killing container" + container +
+            " (after waiting for premption for " +
+            (clock.getTime() - time) + "ms)");
       }
+    } else {
+      // track the request in the FSSchedulerApp itself
+      app.addPreemption(container, clock.getTime());
     }
   }
 
@@ -487,11 +540,11 @@ public class FairScheduler implements Re
     return clusterCapacity;
   }
 
-  public Clock getClock() {
+  public synchronized Clock getClock() {
     return clock;
   }
 
-  protected void setClock(Clock clock) {
+  protected synchronized void setClock(Clock clock) {
     this.clock = clock;
   }
 
@@ -746,10 +799,18 @@ public class FairScheduler implements Re
         LOG.debug("allocate:" +
             " applicationAttemptId=" + appAttemptId +
             " #ask=" + ask.size());
-      }
 
+        LOG.debug("Preempting " + application.getPreemptionContainers().size()
+            + " container(s)");
+      }
+      
+      Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
+      for (RMContainer container : application.getPreemptionContainers()) {
+        preemptionContainerIds.add(container.getContainerId());
+      }
+      
       return new Allocation(application.pullNewlyAllocatedContainers(),
-          application.getHeadroom());
+          application.getHeadroom(), preemptionContainerIds);
     }
   }
 
@@ -950,7 +1011,9 @@ public class FairScheduler implements Re
     assignMultiple = this.conf.getAssignMultiple();
     maxAssign = this.conf.getMaxAssign();
     sizeBasedWeight = this.conf.getSizeBasedWeight();
-    
+    preemptionInterval = this.conf.getPreemptionInterval();
+    waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+
     if (!initialized) {
       rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Thu May  9 21:21:42 2013
@@ -52,6 +52,11 @@ public class FairSchedulerConfiguration 
   /** Whether preemption is enabled. */
   protected static final String  PREEMPTION = CONF_PREFIX + "preemption";
   protected static final boolean DEFAULT_PREEMPTION = false;
+  
+  protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
+  protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+  protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
+  protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
 
   /** Whether to assign multiple containers in one check-in. */
   protected static final String  ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
@@ -120,4 +125,12 @@ public class FairSchedulerConfiguration 
     return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
     		"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
   }
+  
+  public int getPreemptionInterval() {
+    return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+  }
+  
+  public int getWaitTimeBeforeKill() {
+    return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu May  9 21:21:42 2013
@@ -30,6 +30,7 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -891,9 +892,16 @@ public class TestFairScheduler {
    */
   public void testChoiceOfPreemptedContainers() throws Exception {
     Configuration conf = createConfiguration();
+    
+    conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); 
+    
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+    
+    MockClock clock = new MockClock();
+    scheduler.setClock(clock);
+    
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
@@ -988,15 +996,38 @@ public class TestFairScheduler {
         Resources.createResource(2 * 1024));
     assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
     assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
-    assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
     assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
     assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
+    
+    // First verify we are adding containers to preemption list for the application
+    assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(),
+                                     scheduler.applications.get(app3).getPreemptionContainers()));
+    assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(),
+                                     scheduler.applications.get(app6).getPreemptionContainers()));
+
+    // Pretend 15 seconds have passed
+    clock.tick(15);
+
+    // Trigger a kill by insisting we want containers back
+    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+        Resources.createResource(2 * 1024));
+
+    // At this point the containers should have been killed (since we are not simulating AM)
     assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+
+    // Trigger a kill by insisting we want containers back
+    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+        Resources.createResource(2 * 1024));
+
+    // Pretend 15 seconds have passed
+    clock.tick(15);
 
     // We should be able to claw back another container from A and B each.
     // Make sure it is lowest priority container.
     scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
         Resources.createResource(2 * 1024));
+    
     assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
     assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
     assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());