You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/02/27 15:15:12 UTC

[16/31] hadoop git commit: YARN-4779. Fix AM container allocation logic in SLS. Contributed by Wangda Tan.

YARN-4779. Fix AM container allocation logic in SLS. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b32ffa27
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b32ffa27
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b32ffa27

Branch: refs/heads/HADOOP-13345
Commit: b32ffa2753e83615b980721b6067fcc35ce54372
Parents: e8694de
Author: Sunil G <su...@apache.org>
Authored: Fri Feb 24 21:39:25 2017 +0530
Committer: Sunil G <su...@apache.org>
Committed: Fri Feb 24 21:39:25 2017 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/yarn/sls/SLSRunner.java   |  20 +-
 .../hadoop/yarn/sls/appmaster/AMSimulator.java  |  89 +++++---
 .../yarn/sls/appmaster/MRAMSimulator.java       | 218 ++++++++-----------
 .../sls/resourcemanager/MockAMLauncher.java     | 115 ++++++++++
 .../sls/scheduler/SLSCapacityScheduler.java     |  24 ++
 5 files changed, 305 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 61738fb..61b7f36 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -55,12 +56,14 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
+import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
 import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
@@ -119,10 +122,10 @@ public class SLSRunner {
     this.printSimulation = printsimulation;
     metricsOutputDir = outputDir;
     
-    nmMap = new HashMap<NodeId, NMSimulator>();
-    queueAppNumMap = new HashMap<String, Integer>();
-    amMap = new HashMap<String, AMSimulator>();
-    amClassMap = new HashMap<String, Class>();
+    nmMap = new HashMap<>();
+    queueAppNumMap = new HashMap<>();
+    amMap = new ConcurrentHashMap<>();
+    amClassMap = new HashMap<>();
     
     // runner configuration
     conf = new Configuration(false);
@@ -179,7 +182,14 @@ public class SLSRunner {
     }
 
     rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
-    rm = new ResourceManager();
+
+    final SLSRunner se = this;
+    rm = new ResourceManager() {
+      @Override
+      protected ApplicationMasterLauncher createAMLauncher() {
+        return new MockAMLauncher(se, this.rmContext, amMap);
+      }
+    };
     rm.init(rmConf);
     rm.start();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index d61bf02..5b03d51 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.log4j.Logger;
 
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
@@ -107,11 +109,19 @@ public abstract class AMSimulator extends TaskRunner.Task {
   // progress
   protected int totalContainers;
   protected int finishedContainers;
+
+  // waiting for AM container
+  volatile boolean isAMContainerRunning = false;
+  volatile Container amContainer;
   
   protected final Logger LOG = Logger.getLogger(AMSimulator.class);
-  
+
+  // resource for AM container
+  private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
+  private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
+
   public AMSimulator() {
-    this.responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+    this.responseQueue = new LinkedBlockingQueue<>();
   }
 
   public void init(int id, int heartbeatInterval, 
@@ -142,23 +152,30 @@ public abstract class AMSimulator extends TaskRunner.Task {
     // submit application, waiting until ACCEPTED
     submitApp();
 
-    // register application master
-    registerAM();
-
     // track app metrics
     trackApp();
   }
 
+  public synchronized void notifyAMContainerLaunched(Container masterContainer)
+      throws Exception {
+    this.amContainer = masterContainer;
+    this.appAttemptId = masterContainer.getId().getApplicationAttemptId();
+    registerAM();
+    isAMContainerRunning = true;
+  }
+
   @Override
   public void middleStep() throws Exception {
-    // process responses in the queue
-    processResponseQueue();
-    
-    // send out request
-    sendContainerRequest();
-    
-    // check whether finish
-    checkStop();
+    if (isAMContainerRunning) {
+      // process responses in the queue
+      processResponseQueue();
+
+      // send out request
+      sendContainerRequest();
+
+      // check whether finish
+      checkStop();
+    }
   }
 
   @Override
@@ -168,6 +185,22 @@ public abstract class AMSimulator extends TaskRunner.Task {
     if (isTracked) {
       untrackApp();
     }
+
+    // Finish AM container
+    if (amContainer != null) {
+      LOG.info("AM container = " + amContainer.getId() + " reported to finish");
+      se.getNmMap().get(amContainer.getNodeId()).cleanupContainer(
+          amContainer.getId());
+    } else {
+      LOG.info("AM container is null");
+    }
+
+    if (null == appAttemptId) {
+      // If appAttemptId == null, AM is not launched from RM's perspective, so
+      // it's unnecessary to finish am as well
+      return;
+    }
+
     // unregister application master
     final FinishApplicationMasterRequest finishAMRequest = recordFactory
                   .newRecordInstance(FinishApplicationMasterRequest.class);
@@ -256,7 +289,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
     conLauContext.setLocalResources(new HashMap<String, LocalResource>());
     conLauContext.setServiceData(new HashMap<String, ByteBuffer>());
     appSubContext.setAMContainerSpec(conLauContext);
-    appSubContext.setUnmanagedAM(true);
+    appSubContext.setResource(Resources
+        .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
+            MR_AM_CONTAINER_RESOURCE_VCORES));
     subAppRequest.setApplicationSubmissionContext(appSubContext);
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
     ugi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -267,22 +302,6 @@ public abstract class AMSimulator extends TaskRunner.Task {
       }
     });
     LOG.info(MessageFormat.format("Submit a new application {0}", appId));
-    
-    // waiting until application ACCEPTED
-    RMApp app = rm.getRMContext().getRMApps().get(appId);
-    while(app.getState() != RMAppState.ACCEPTED) {
-      Thread.sleep(10);
-    }
-
-    // Waiting until application attempt reach LAUNCHED
-    // "Unmanaged AM must register after AM attempt reaches LAUNCHED state"
-    this.appAttemptId = rm.getRMContext().getRMApps().get(appId)
-        .getCurrentAppAttempt().getAppAttemptId();
-    RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId)
-        .getCurrentAppAttempt();
-    while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) {
-      Thread.sleep(10);
-    }
   }
 
   private void registerAM()
@@ -335,7 +354,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
     for (ContainerSimulator cs : csList) {
       String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname());
       // check rack local
-      String rackname = rackHostNames[0];
+      String rackname = "/" + rackHostNames[0];
       if (rackLocalRequestMap.containsKey(rackname)) {
         rackLocalRequestMap.get(rackname).setNumContainers(
             rackLocalRequestMap.get(rackname).getNumContainers() + 1);
@@ -383,4 +402,12 @@ public abstract class AMSimulator extends TaskRunner.Task {
   public int getNumTasks() {
     return totalContainers;
   }
+
+  public ApplicationId getApplicationId() {
+    return appId;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return appAttemptId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index da267a1..e726b09 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.avro.Protocol;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -63,10 +64,10 @@ public class MRAMSimulator extends AMSimulator {
   
   private static final int PRIORITY_REDUCE = 10;
   private static final int PRIORITY_MAP = 20;
-  
+
   // pending maps
   private LinkedList<ContainerSimulator> pendingMaps =
-          new LinkedList<ContainerSimulator>();
+          new LinkedList<>();
   
   // pending failed maps
   private LinkedList<ContainerSimulator> pendingFailedMaps =
@@ -107,14 +108,9 @@ public class MRAMSimulator extends AMSimulator {
   private int mapTotal = 0;
   private int reduceFinished = 0;
   private int reduceTotal = 0;
-  // waiting for AM container 
-  private boolean isAMContainerRunning = false;
-  private Container amContainer;
+
   // finished
   private boolean isFinished = false;
-  // resource for AM container
-  private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
-  private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
 
   public final Logger LOG = Logger.getLogger(MRAMSimulator.class);
 
@@ -131,83 +127,34 @@ public class MRAMSimulator extends AMSimulator {
     for (ContainerSimulator cs : containerList) {
       if (cs.getType().equals("map")) {
         cs.setPriority(PRIORITY_MAP);
-        pendingMaps.add(cs);
+        allMaps.add(cs);
       } else if (cs.getType().equals("reduce")) {
         cs.setPriority(PRIORITY_REDUCE);
-        pendingReduces.add(cs);
+        allReduces.add(cs);
       }
     }
-    allMaps.addAll(pendingMaps);
-    allReduces.addAll(pendingReduces);
-    mapTotal = pendingMaps.size();
-    reduceTotal = pendingReduces.size();
+
+    LOG.info(MessageFormat
+        .format("Added new job with {0} mapper and {1} reducers",
+            allMaps.size(), allReduces.size()));
+
+    mapTotal = allMaps.size();
+    reduceTotal = allReduces.size();
     totalContainers = mapTotal + reduceTotal;
   }
 
   @Override
-  public void firstStep() throws Exception {
-    super.firstStep();
-    
-    requestAMContainer();
-  }
-
-  /**
-   * send out request for AM container
-   */
-  protected void requestAMContainer()
-          throws YarnException, IOException, InterruptedException {
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest amRequest = createResourceRequest(
-            BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
-                    MR_AM_CONTAINER_RESOURCE_VCORES),
-            ResourceRequest.ANY, 1, 1);
-    ask.add(amRequest);
-    LOG.debug(MessageFormat.format("Application {0} sends out allocate " +
-            "request for its AM", appId));
-    final AllocateRequest request = this.createAllocateRequest(ask);
-
-    UserGroupInformation ugi =
-            UserGroupInformation.createRemoteUser(appAttemptId.toString());
-    Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
-            .get(appAttemptId.getApplicationId())
-            .getRMAppAttempt(appAttemptId).getAMRMToken();
-    ugi.addTokenIdentifier(token.decodeIdentifier());
-    AllocateResponse response = ugi.doAs(
-            new PrivilegedExceptionAction<AllocateResponse>() {
-      @Override
-      public AllocateResponse run() throws Exception {
-        return rm.getApplicationMasterService().allocate(request);
-      }
-    });
-    if (response != null) {
-      responseQueue.put(response);
+  public synchronized void notifyAMContainerLaunched(Container masterContainer)
+      throws Exception {
+    if (null != masterContainer) {
+      restart();
+      super.notifyAMContainerLaunched(masterContainer);
     }
   }
 
   @Override
   @SuppressWarnings("unchecked")
-  protected void processResponseQueue()
-          throws InterruptedException, YarnException, IOException {
-    // Check whether receive the am container
-    if (!isAMContainerRunning) {
-      if (!responseQueue.isEmpty()) {
-        AllocateResponse response = responseQueue.take();
-        if (response != null
-            && !response.getAllocatedContainers().isEmpty()) {
-          // Get AM container
-          Container container = response.getAllocatedContainers().get(0);
-          se.getNmMap().get(container.getNodeId())
-              .addNewContainer(container, -1L);
-          // Start AM container
-          amContainer = container;
-          LOG.debug(MessageFormat.format("Application {0} starts its " +
-              "AM container ({1}).", appId, amContainer.getId()));
-          isAMContainerRunning = true;
-        }
-      }
-      return;
-    }
-
+  protected void processResponseQueue() throws Exception {
     while (! responseQueue.isEmpty()) {
       AllocateResponse response = responseQueue.take();
 
@@ -228,12 +175,16 @@ public class MRAMSimulator extends AMSimulator {
               assignedReduces.remove(containerId);
               reduceFinished ++;
               finishedContainers ++;
-            } else {
+            } else if (amContainer.getId().equals(containerId)){
               // am container released event
               isFinished = true;
               LOG.info(MessageFormat.format("Application {0} goes to " +
                       "finish.", appId));
             }
+
+            if (mapFinished >= mapTotal && reduceFinished >= reduceTotal) {
+              lastStep();
+            }
           } else {
             // container to be killed
             if (assignedMaps.containsKey(containerId)) {
@@ -244,10 +195,9 @@ public class MRAMSimulator extends AMSimulator {
               LOG.debug(MessageFormat.format("Application {0} has one " +
                       "reducer killed ({1}).", appId, containerId));
               pendingFailedReduces.add(assignedReduces.remove(containerId));
-            } else {
+            } else if (amContainer.getId().equals(containerId)){
               LOG.info(MessageFormat.format("Application {0}'s AM is " +
-                      "going to be killed. Restarting...", appId));
-              restart();
+                      "going to be killed. Waiting for rescheduling...", appId));
             }
           }
         }
@@ -255,11 +205,8 @@ public class MRAMSimulator extends AMSimulator {
       
       // check finished
       if (isAMContainerRunning &&
-              (mapFinished == mapTotal) &&
-              (reduceFinished == reduceTotal)) {
-        // to release the AM container
-        se.getNmMap().get(amContainer.getNodeId())
-                .cleanupContainer(amContainer.getId());
+              (mapFinished >= mapTotal) &&
+              (reduceFinished >= reduceTotal)) {
         isAMContainerRunning = false;
         LOG.debug(MessageFormat.format("Application {0} sends out event " +
                 "to clean up its AM container.", appId));
@@ -293,21 +240,38 @@ public class MRAMSimulator extends AMSimulator {
    */
   private void restart()
           throws YarnException, IOException, InterruptedException {
-    // clear 
-    finishedContainers = 0;
+    // clear
     isFinished = false;
-    mapFinished = 0;
-    reduceFinished = 0;
     pendingFailedMaps.clear();
     pendingMaps.clear();
     pendingReduces.clear();
     pendingFailedReduces.clear();
-    pendingMaps.addAll(allMaps);
-    pendingReduces.addAll(pendingReduces);
-    isAMContainerRunning = false;
+
+    // Only add totalMaps - finishedMaps
+    int added = 0;
+    for (ContainerSimulator cs : allMaps) {
+      if (added >= mapTotal - mapFinished) {
+        break;
+      }
+      pendingMaps.add(cs);
+    }
+
+    // And same, only add totalReduces - finishedReduces
+    added = 0;
+    for (ContainerSimulator cs : allReduces) {
+      if (added >= reduceTotal - reduceFinished) {
+        break;
+      }
+      pendingReduces.add(cs);
+    }
     amContainer = null;
-    // resent am container request
-    requestAMContainer();
+  }
+
+  private List<ContainerSimulator> mergeLists(List<ContainerSimulator> left, List<ContainerSimulator> right) {
+    List<ContainerSimulator> list = new ArrayList<>();
+    list.addAll(left);
+    list.addAll(right);
+    return list;
   }
 
   @Override
@@ -319,44 +283,48 @@ public class MRAMSimulator extends AMSimulator {
 
     // send out request
     List<ResourceRequest> ask = null;
-    if (isAMContainerRunning) {
-      if (mapFinished != mapTotal) {
-        // map phase
-        if (! pendingMaps.isEmpty()) {
-          ask = packageRequests(pendingMaps, PRIORITY_MAP);
-          LOG.debug(MessageFormat.format("Application {0} sends out " +
-                  "request for {1} mappers.", appId, pendingMaps.size()));
-          scheduledMaps.addAll(pendingMaps);
-          pendingMaps.clear();
-        } else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) {
-          ask = packageRequests(pendingFailedMaps, PRIORITY_MAP);
-          LOG.debug(MessageFormat.format("Application {0} sends out " +
-                  "requests for {1} failed mappers.", appId,
-                  pendingFailedMaps.size()));
-          scheduledMaps.addAll(pendingFailedMaps);
-          pendingFailedMaps.clear();
-        }
-      } else if (reduceFinished != reduceTotal) {
-        // reduce phase
-        if (! pendingReduces.isEmpty()) {
-          ask = packageRequests(pendingReduces, PRIORITY_REDUCE);
-          LOG.debug(MessageFormat.format("Application {0} sends out " +
-                  "requests for {1} reducers.", appId, pendingReduces.size()));
-          scheduledReduces.addAll(pendingReduces);
-          pendingReduces.clear();
-        } else if (! pendingFailedReduces.isEmpty()
-                && scheduledReduces.isEmpty()) {
-          ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE);
-          LOG.debug(MessageFormat.format("Application {0} sends out " +
-                  "request for {1} failed reducers.", appId,
-                  pendingFailedReduces.size()));
-          scheduledReduces.addAll(pendingFailedReduces);
-          pendingFailedReduces.clear();
-        }
+    if (mapFinished != mapTotal) {
+      // map phase
+      if (!pendingMaps.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
+            PRIORITY_MAP);
+        LOG.debug(MessageFormat
+            .format("Application {0} sends out " + "request for {1} mappers.",
+                appId, pendingMaps.size()));
+        scheduledMaps.addAll(pendingMaps);
+        pendingMaps.clear();
+      } else if (!pendingFailedMaps.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps),
+            PRIORITY_MAP);
+        LOG.debug(MessageFormat.format(
+            "Application {0} sends out " + "requests for {1} failed mappers.",
+            appId, pendingFailedMaps.size()));
+        scheduledMaps.addAll(pendingFailedMaps);
+        pendingFailedMaps.clear();
+      }
+    } else if (reduceFinished != reduceTotal) {
+      // reduce phase
+      if (!pendingReduces.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingReduces, scheduledReduces),
+            PRIORITY_REDUCE);
+        LOG.debug(MessageFormat
+            .format("Application {0} sends out " + "requests for {1} reducers.",
+                appId, pendingReduces.size()));
+        scheduledReduces.addAll(pendingReduces);
+        pendingReduces.clear();
+      } else if (!pendingFailedReduces.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces),
+            PRIORITY_REDUCE);
+        LOG.debug(MessageFormat.format(
+            "Application {0} sends out " + "request for {1} failed reducers.",
+            appId, pendingFailedReduces.size()));
+        scheduledReduces.addAll(pendingFailedReduces);
+        pendingFailedReduces.clear();
       }
     }
+
     if (ask == null) {
-      ask = new ArrayList<ResourceRequest>();
+      ask = new ArrayList<>();
     }
     
     final AllocateRequest request = createAllocateRequest(ask);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
new file mode 100644
index 0000000..20cf3e5
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.resourcemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
+
+import java.util.Map;
+
+public class MockAMLauncher extends ApplicationMasterLauncher
+    implements EventHandler<AMLauncherEvent> {
+  private static final Log LOG = LogFactory.getLog(
+      MockAMLauncher.class);
+
+  Map<String, AMSimulator> amMap;
+  SLSRunner se;
+
+  public MockAMLauncher(SLSRunner se, RMContext rmContext,
+      Map<String, AMSimulator> amMap) {
+    super(rmContext);
+    this.amMap = amMap;
+    this.se = se;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // Do nothing
+  }
+
+  private void setupAMRMToken(RMAppAttempt appAttempt) {
+    // Setup AMRMToken
+    Token<AMRMTokenIdentifier> amrmToken =
+        super.context.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.getAppAttemptId());
+    ((RMAppAttemptImpl) appAttempt).setAMRMToken(amrmToken);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handle(AMLauncherEvent event) {
+    if (AMLauncherEventType.LAUNCH == event.getType()) {
+      ApplicationId appId =
+          event.getAppAttempt().getAppAttemptId().getApplicationId();
+
+      // find AMSimulator
+      for (AMSimulator ams : amMap.values()) {
+        if (ams.getApplicationId() != null && ams.getApplicationId().equals(
+            appId)) {
+          try {
+            Container amContainer = event.getAppAttempt().getMasterContainer();
+
+            setupAMRMToken(event.getAppAttempt());
+
+            // Notify RMAppAttempt to change state
+            super.context.getDispatcher().getEventHandler().handle(
+                new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
+                    RMAppAttemptEventType.LAUNCHED));
+
+            ams.notifyAMContainerLaunched(
+                event.getAppAttempt().getMasterContainer());
+            LOG.info("Notify AM launcher launched:" + amContainer.getId());
+
+            se.getNmMap().get(amContainer.getNodeId())
+                .addNewContainer(amContainer, 100000000L);
+
+            return;
+          } catch (Exception e) {
+            throw new YarnRuntimeException(e);
+          }
+        }
+      }
+
+      throw new YarnRuntimeException(
+          "Didn't find any AMSimulator for applicationId=" + appId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 8388273..cd4377e 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -556,6 +556,30 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
         }
       }
     );
+    metrics.register("variable.cluster.reserved.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            if(getRootQueueMetrics() == null) {
+              return 0L;
+            } else {
+              return getRootQueueMetrics().getReservedMB();
+            }
+          }
+        }
+    );
+    metrics.register("variable.cluster.reserved.vcores",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            if(getRootQueueMetrics() == null) {
+              return 0;
+            } else {
+              return getRootQueueMetrics().getReservedVirtualCores();
+            }
+          }
+        }
+    );
   }
 
   private void registerContainerAppNumMetrics() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org