You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ec...@apache.org on 2015/09/15 21:12:19 UTC

[34/50] [abbrv] hadoop git commit: YARN-2005. Blacklisting support for scheduling AMs. (Anubhav Dhoot via kasha)

YARN-2005. Blacklisting support for scheduling AMs. (Anubhav Dhoot via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81df7b58
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81df7b58
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81df7b58

Branch: refs/heads/HADOOP-11890
Commit: 81df7b586a16f8226c7b01c139c1c70c060399c3
Parents: 7269906
Author: Karthik Kambatla <ka...@apache.org>
Authored: Sun Sep 13 17:03:15 2015 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Sun Sep 13 17:03:15 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   9 ++
 .../src/main/resources/yarn-default.xml         |  18 +++
 .../blacklist/BlacklistManager.java             |  47 ++++++
 .../blacklist/BlacklistUpdates.java             |  47 ++++++
 .../blacklist/DisabledBlacklistManager.java     |  45 ++++++
 .../blacklist/SimpleBlacklistManager.java       |  84 +++++++++++
 .../server/resourcemanager/rmapp/RMAppImpl.java |  32 +++-
 .../rmapp/attempt/RMAppAttempt.java             |   7 +
 .../rmapp/attempt/RMAppAttemptImpl.java         |  58 +++++++-
 .../scheduler/AppSchedulingInfo.java            |  78 +++++++---
 .../scheduler/SchedulerApplicationAttempt.java  |  37 +++--
 .../scheduler/capacity/CapacityScheduler.java   |   9 +-
 .../common/fica/FiCaSchedulerUtils.java         |  48 ------
 .../scheduler/fair/FairScheduler.java           |   9 +-
 .../scheduler/fifo/FifoScheduler.java           |  11 +-
 .../yarn/server/resourcemanager/MockRM.java     |  14 +-
 .../applicationsmanager/TestAMRestart.java      | 149 +++++++++++++++++--
 .../blacklist/TestBlacklistManager.java         | 118 +++++++++++++++
 .../TestRMAppLogAggregationStatus.java          |   2 +-
 .../rmapp/TestRMAppTransitions.java             |   2 +-
 .../capacity/TestCapacityScheduler.java         |  25 ++++
 .../scheduler/fair/FairSchedulerTestBase.java   |   2 +-
 23 files changed, 741 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3246946..4a3a666 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -430,6 +430,8 @@ Release 2.8.0 - UNRELEASED
     YARN-4145. Make RMHATestBase abstract so its not run when running all
     tests under that namespace (adhoot via rkanter)
 
+    YARN-2005. Blacklisting support for scheduling AMs. (Anubhav Dhoot via kasha)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9ec25ae..cc4f5de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2025,6 +2025,15 @@ public class YarnConfiguration extends Configuration {
   public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
       NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
 
+  public static final String AM_BLACKLISTING_ENABLED =
+      YARN_PREFIX + "am.blacklisting.enabled";
+  public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;
+
+  public static final String AM_BLACKLISTING_DISABLE_THRESHOLD =
+      YARN_PREFIX + "am.blacklisting.disable-failure-threshold";
+  public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f;
+
+
   public YarnConfiguration() {
     super();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index b76defb..bcd64c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2293,4 +2293,22 @@
     <value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
   </property>
 
+ <property>
+    <description>
+    Enable/disable blacklisting of hosts for AM based on AM failures on those
+    hosts.
+    </description>
+    <name>yarn.am.blacklisting.enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <description>
+    Threshold of ratio number of NodeManager hosts that are allowed to be
+    blacklisted for AM. Beyond this ratio there is no blacklisting to avoid
+    danger of blacklisting the entire cluster.
+    </description>
+    <name>yarn.am.blacklisting.disable-failure-threshold</name>
+    <value>0.8f</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java
new file mode 100644
index 0000000..f03b421
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Tracks blacklists based on failures reported on nodes.
+ */
+@Private
+public interface BlacklistManager {
+
+  /**
+   * Report failure of a container on node.
+   * @param node that has a container failure
+   */
+  void addNode(String node);
+
+  /**
+   * Get {@link BlacklistUpdates} that indicate which nodes should be
+   * added or to removed from the blacklist.
+   * @return {@link BlacklistUpdates}
+   */
+  BlacklistUpdates getBlacklistUpdates();
+
+  /**
+   * Refresh the number of nodemanager hosts available for scheduling.
+   * @param nodeHostCount is the number of node hosts.
+   */
+  void refreshNodeHostCount(int nodeHostCount);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistUpdates.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistUpdates.java
new file mode 100644
index 0000000..c76dfb4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistUpdates.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+import java.util.List;
+
+/**
+ * Class to track blacklist additions and removals.
+ */
+@Private
+public class BlacklistUpdates {
+
+  private List<String> additions;
+  private List<String> removals;
+
+  public BlacklistUpdates(List<String> additions,
+      List<String> removals) {
+    this.additions = additions;
+    this.removals = removals;
+  }
+
+  public List<String> getAdditions() {
+    return additions;
+  }
+
+  public List<String> getRemovals() {
+    return removals;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java
new file mode 100644
index 0000000..f155b45
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import java.util.ArrayList;
+
+/**
+ * A {@link BlacklistManager} that returns no blacklists.
+ */
+public class DisabledBlacklistManager implements BlacklistManager{
+
+  private static final ArrayList<String> EMPTY_LIST = new ArrayList<String>();
+  private BlacklistUpdates noBlacklist =
+      new BlacklistUpdates(EMPTY_LIST, EMPTY_LIST);
+
+  @Override
+  public void addNode(String node) {
+  }
+
+  @Override
+  public BlacklistUpdates getBlacklistUpdates() {
+    return noBlacklist;
+  }
+
+  @Override
+  public void refreshNodeHostCount(int nodeHostCount) {
+    // Do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java
new file mode 100644
index 0000000..a544ab8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Maintains a list of failed nodes and returns that as long as number of
+ * blacklisted nodes is below a threshold percentage of total nodes. If more
+ * than threshold number of nodes are marked as failure they all are returned
+ * as removal from blacklist so previous additions are reversed.
+ */
+public class SimpleBlacklistManager implements BlacklistManager {
+
+  private int numberOfNodeManagerHosts;
+  private final double blacklistDisableFailureThreshold;
+  private final Set<String> blacklistNodes = new HashSet<>();
+  private static final ArrayList<String> EMPTY_LIST = new ArrayList<>();
+
+  private static final Log LOG = LogFactory.getLog(SimpleBlacklistManager.class);
+
+  public SimpleBlacklistManager(int numberOfNodeManagerHosts,
+      double blacklistDisableFailureThreshold) {
+    this.numberOfNodeManagerHosts = numberOfNodeManagerHosts;
+    this.blacklistDisableFailureThreshold = blacklistDisableFailureThreshold;
+  }
+
+  @Override
+  public void addNode(String node) {
+    blacklistNodes.add(node);
+  }
+
+  @Override
+  public void refreshNodeHostCount(int nodeHostCount) {
+    this.numberOfNodeManagerHosts = nodeHostCount;
+  }
+
+  @Override
+  public BlacklistUpdates getBlacklistUpdates() {
+    BlacklistUpdates ret;
+    List<String> blacklist = new ArrayList<>(blacklistNodes);
+    final int currentBlacklistSize = blacklist.size();
+    final double failureThreshold = this.blacklistDisableFailureThreshold *
+        numberOfNodeManagerHosts;
+    if (currentBlacklistSize < failureThreshold) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("blacklist size " + currentBlacklistSize + " is less than " +
+            "failure threshold ratio " + blacklistDisableFailureThreshold +
+            " out of total usable nodes " + numberOfNodeManagerHosts);
+      }
+      ret = new BlacklistUpdates(blacklist, EMPTY_LIST);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("blacklist size " + currentBlacklistSize + " is more than " +
+            "failure threshold ratio " + blacklistDisableFailureThreshold +
+            " out of total usable nodes " + numberOfNodeManagerHosts);
+      }
+      ret = new BlacklistUpdates(EMPTY_LIST, blacklist);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2eb74f7..7cf39b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -74,6 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -133,6 +136,8 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final Set<String> applicationTags;
 
   private final long attemptFailuresValidityInterval;
+  private final boolean amBlacklistingEnabled;
+  private final float blacklistDisableThreshold;
 
   private Clock systemClock;
 
@@ -456,6 +461,18 @@ public class RMAppImpl implements RMApp, Recoverable {
     maxLogAggregationDiagnosticsInMemory = conf.getInt(
         YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
         YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+
+    amBlacklistingEnabled = conf.getBoolean(
+        YarnConfiguration.AM_BLACKLISTING_ENABLED,
+        YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
+
+    if (amBlacklistingEnabled) {
+      blacklistDisableThreshold = conf.getFloat(
+          YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
+          YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
+    } else {
+      blacklistDisableThreshold = 0.0f;
+    }
   }
 
   @Override
@@ -797,6 +814,18 @@ public class RMAppImpl implements RMApp, Recoverable {
   private void createNewAttempt() {
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
+
+    BlacklistManager currentAMBlacklist;
+    if (currentAttempt != null) {
+      currentAMBlacklist = currentAttempt.getAMBlacklist();
+    } else {
+      if (amBlacklistingEnabled) {
+        currentAMBlacklist = new SimpleBlacklistManager(
+            scheduler.getNumClusterNodes(), blacklistDisableThreshold);
+      } else {
+        currentAMBlacklist = new DisabledBlacklistManager();
+      }
+    }
     RMAppAttempt attempt =
         new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
           submissionContext, conf,
@@ -804,7 +833,8 @@ public class RMAppImpl implements RMApp, Recoverable {
           // previously failed attempts(which should not include Preempted,
           // hardware error and NM resync) + 1) equal to the max-attempt
           // limit.
-          maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
+          maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
+          currentAMBlacklist);
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index b85174e..4dd8345 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 
 /**
@@ -185,6 +186,12 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
   ApplicationResourceUsageReport getApplicationResourceUsageReport();
 
   /**
+   * Get the {@link BlacklistManager} that manages blacklists for AM failures
+   * @return the {@link BlacklistManager} that tracks AM failures.
+   */
+  BlacklistManager getAMBlacklist();
+
+  /**
    * the start time of the application.
    * @return the start time of the application.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 74a4000..629b2a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -36,7 +36,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import javax.crypto.SecretKey;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -71,6 +70,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -182,6 +184,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   
   private RMAppAttemptMetrics attemptMetrics = null;
   private ResourceRequest amReq = null;
+  private BlacklistManager blacklistedNodesForAM = null;
 
   private static final StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
@@ -434,6 +437,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
       Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq) {
+    this(appAttemptId, rmContext, scheduler, masterService, submissionContext,
+        conf, maybeLastAttempt, amReq, new DisabledBlacklistManager());
+  }
+
+  public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
+      RMContext rmContext, YarnScheduler scheduler,
+      ApplicationMasterService masterService,
+      ApplicationSubmissionContext submissionContext,
+      Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq,
+      BlacklistManager amBlacklist) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
     this.rmContext = rmContext;
@@ -454,6 +467,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         new RMAppAttemptMetrics(applicationAttemptId, rmContext);
     
     this.amReq = amReq;
+    this.blacklistedNodesForAM = amBlacklist;
   }
 
   @Override
@@ -939,12 +953,25 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
         appAttempt.amReq.setResourceName(ResourceRequest.ANY);
         appAttempt.amReq.setRelaxLocality(true);
-        
+
+        appAttempt.getAMBlacklist().refreshNodeHostCount(
+            appAttempt.scheduler.getNumClusterNodes());
+
+        BlacklistUpdates amBlacklist = appAttempt.getAMBlacklist()
+            .getBlacklistUpdates();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Using blacklist for AM: additions(" +
+              amBlacklist.getAdditions() + ") and removals(" +
+              amBlacklist.getRemovals() + ")");
+        }
         // AM resource has been checked when submission
         Allocation amContainerAllocation =
-            appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
+            appAttempt.scheduler.allocate(
+                appAttempt.applicationAttemptId,
                 Collections.singletonList(appAttempt.amReq),
-                EMPTY_CONTAINER_RELEASE_LIST, null, null);
+                EMPTY_CONTAINER_RELEASE_LIST,
+                amBlacklist.getAdditions(),
+                amBlacklist.getRemovals());
         if (amContainerAllocation != null
             && amContainerAllocation.getContainers() != null) {
           assert (amContainerAllocation.getContainers().size() == 0);
@@ -1331,7 +1358,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
   }
 
-  private static final class UnmanagedAMAttemptSavedTransition 
+  private boolean shouldCountTowardsNodeBlacklisting(int exitStatus) {
+    return exitStatus == ContainerExitStatus.DISKS_FAILED;
+  }
+
+  private static final class UnmanagedAMAttemptSavedTransition
                                                 extends AMLaunchedTransition {
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
@@ -1694,6 +1725,14 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
       RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
     NodeId nodeId = containerFinishedEvent.getNodeId();
+    if (containerFinishedEvent.getContainerStatus() != null) {
+      if (shouldCountTowardsNodeBlacklisting(containerFinishedEvent
+          .getContainerStatus().getExitStatus())) {
+        appAttempt.addAMNodeToBlackList(containerFinishedEvent.getNodeId());
+      }
+    } else {
+      LOG.warn("No ContainerStatus in containerFinishedEvent");
+    }
     finishedContainersSentToAM.putIfAbsent(nodeId,
       new ArrayList<ContainerStatus>());
     appAttempt.finishedContainersSentToAM.get(nodeId).add(
@@ -1708,6 +1747,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
   }
 
+  private void addAMNodeToBlackList(NodeId nodeId) {
+    blacklistedNodesForAM.addNode(nodeId.getHost().toString());
+  }
+
+  @Override
+  public BlacklistManager getAMBlacklist() {
+    return blacklistedNodesForAM;
+  }
+
   private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
       RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
     appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 77ac5b3..e318d47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -65,7 +65,8 @@ public class AppSchedulingInfo {
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
   final Map<Priority, Map<String, ResourceRequest>> requests =
     new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
-  private Set<String> blacklist = new HashSet<String>();
+  private Set<String> userBlacklist = new HashSet<>();
+  private Set<String> amBlacklist = new HashSet<>();
 
   //private final ApplicationStore store;
   private ActiveUsersManager activeUsersManager;
@@ -217,21 +218,39 @@ public class AppSchedulingInfo {
   }
 
   /**
-   * The ApplicationMaster is updating the blacklist
+   * The ApplicationMaster is updating the userBlacklist used for containers
+   * other than AMs.
    *
-   * @param blacklistAdditions resources to be added to the blacklist
-   * @param blacklistRemovals resources to be removed from the blacklist
+   * @param blacklistAdditions resources to be added to the userBlacklist
+   * @param blacklistRemovals resources to be removed from the userBlacklist
    */
-  synchronized public void updateBlacklist(
+   public void updateBlacklist(
       List<String> blacklistAdditions, List<String> blacklistRemovals) {
-    // Add to blacklist
-    if (blacklistAdditions != null) {
-      blacklist.addAll(blacklistAdditions);
-    }
+     updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
+         blacklistRemovals);
+  }
+
+  /**
+   * RM is updating blacklist for AM containers.
+   * @param blacklistAdditions resources to be added to the amBlacklist
+   * @param blacklistRemovals resources to be added to the amBlacklist
+   */
+  public void updateAMBlacklist(
+      List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    updateUserOrAMBlacklist(amBlacklist, blacklistAdditions,
+        blacklistRemovals);
+  }
+
+  void updateUserOrAMBlacklist(Set<String> blacklist,
+      List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    synchronized (blacklist) {
+      if (blacklistAdditions != null) {
+        blacklist.addAll(blacklistAdditions);
+      }
 
-    // Remove from blacklist
-    if (blacklistRemovals != null) {
-      blacklist.removeAll(blacklistRemovals);
+      if (blacklistRemovals != null) {
+        blacklist.removeAll(blacklistRemovals);
+      }
     }
   }
 
@@ -263,8 +282,23 @@ public class AppSchedulingInfo {
     return (request == null) ? null : request.getCapability();
   }
 
-  public synchronized boolean isBlacklisted(String resourceName) {
-    return blacklist.contains(resourceName);
+  /**
+   * Returns if the node is either blacklisted by the user or the system
+   * @param resourceName the resourcename
+   * @param useAMBlacklist true if it should check amBlacklist
+   * @return true if its blacklisted
+   */
+  public boolean isBlacklisted(String resourceName,
+      boolean useAMBlacklist) {
+    if (useAMBlacklist){
+      synchronized (amBlacklist) {
+        return amBlacklist.contains(resourceName);
+      }
+    } else {
+      synchronized (userBlacklist) {
+        return userBlacklist.contains(resourceName);
+      }
+    }
   }
   
   /**
@@ -473,19 +507,25 @@ public class AppSchedulingInfo {
     this.queue = queue;
   }
 
-  public synchronized Set<String> getBlackList() {
-    return this.blacklist;
+  public Set<String> getBlackList() {
+    return this.userBlacklist;
   }
 
-  public synchronized Set<String> getBlackListCopy() {
-    return new HashSet<>(this.blacklist);
+  public Set<String> getBlackListCopy() {
+    synchronized (userBlacklist) {
+      return new HashSet<>(this.userBlacklist);
+    }
   }
 
   public synchronized void transferStateFromPreviousAppSchedulingInfo(
       AppSchedulingInfo appInfo) {
     //    this.priorities = appInfo.getPriorities();
     //    this.requests = appInfo.getRequests();
-    this.blacklist = appInfo.getBlackList();
+    // This should not require locking the userBlacklist since it will not be
+    // used by this instance until after setCurrentAppAttempt.
+    // Should cleanup this to avoid sharing between instances and can
+    // then remove getBlacklist as well.
+    this.userBlacklist = appInfo.getBlackList();
   }
 
   public synchronized void recoverContainer(RMContainer rmContainer) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 4872543..b361d15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -470,16 +470,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       RMContainer rmContainer = i.next();
       Container container = rmContainer.getContainer();
       ContainerType containerType = ContainerType.TASK;
-      // The working knowledge is that masterContainer for AM is null as it
-      // itself is the master container.
-      RMAppAttempt appAttempt =
-          rmContext
-              .getRMApps()
-              .get(
-                  container.getId().getApplicationAttemptId()
-                      .getApplicationId()).getCurrentAppAttempt();
-      if (appAttempt.getMasterContainer() == null
-          && appAttempt.getSubmissionContext().getUnmanagedAM() == false) {
+      boolean isWaitingForAMContainer = isWaitingForAMContainer(
+          container.getId().getApplicationAttemptId().getApplicationId());
+      if (isWaitingForAMContainer) {
         containerType = ContainerType.APPLICATION_MASTER;
       }
       try {
@@ -509,6 +502,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
   }
 
+  public boolean isWaitingForAMContainer(ApplicationId applicationId) {
+    // The working knowledge is that masterContainer for AM is null as it
+    // itself is the master container.
+    RMAppAttempt appAttempt =
+        rmContext.getRMApps().get(applicationId).getCurrentAppAttempt();
+    return (appAttempt != null && appAttempt.getMasterContainer() == null
+        && appAttempt.getSubmissionContext().getUnmanagedAM() == false);
+  }
+
+  // Blacklist used for user containers
   public synchronized void updateBlacklist(
       List<String> blacklistAdditions, List<String> blacklistRemovals) {
     if (!isStopped) {
@@ -516,9 +519,19 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
           blacklistAdditions, blacklistRemovals);
     }
   }
-  
+
+  // Blacklist used for AM containers
+  public synchronized void updateAMBlacklist(
+      List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    if (!isStopped) {
+      this.appSchedulingInfo.updateAMBlacklist(
+          blacklistAdditions, blacklistRemovals);
+    }
+  }
+
   public boolean isBlacklisted(String resourceName) {
-    return this.appSchedulingInfo.isBlacklisted(resourceName);
+    boolean useAMBlacklist = isWaitingForAMContainer(getApplicationId());
+    return this.appSchedulingInfo.isBlacklisted(resourceName, useAMBlacklist);
   }
 
   public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index a7e9d8c..dbaccaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -934,7 +933,13 @@ public class CapacityScheduler extends
         }
       }
 
-      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+      if (application.isWaitingForAMContainer(application.getApplicationId())) {
+        // Allocate is for AM and update AM blacklist for this
+        application.updateAMBlacklist(
+            blacklistAdditions, blacklistRemovals);
+      } else {
+        application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+      }
 
       allocation = application.getAllocation(getResourceCalculator(),
                    clusterResource, getMinimumResourceCapability());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
deleted file mode 100644
index 9bece9b..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
-
-import org.apache.commons.logging.Log;
-
-public class FiCaSchedulerUtils {
-
-  public static  boolean isBlacklisted(FiCaSchedulerApp application,
-      FiCaSchedulerNode node, Log LOG) {
-    if (application.isBlacklisted(node.getNodeName())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping 'host' " + node.getNodeName() + 
-            " for " + application.getApplicationId() + 
-            " since it has been blacklisted");
-      }
-      return true;
-    }
-
-    if (application.isBlacklisted(node.getRackName())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping 'rack' " + node.getRackName() + 
-            " for " + application.getApplicationId() + 
-            " since it has been blacklisted");
-      }
-      return true;
-    }
-
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 3eefb8f..5243fb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -955,7 +955,14 @@ public class FairScheduler extends
         preemptionContainerIds.add(container.getContainerId());
       }
 
-      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+      if (application.isWaitingForAMContainer(application.getApplicationId())) {
+        // Allocate is for AM and update AM blacklist for this
+        application.updateAMBlacklist(
+            blacklistAdditions, blacklistRemovals);
+      } else {
+        application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+      }
+
       ContainersAndNMTokensAllocation allocation =
           application.pullNewlyAllocatedContainersAndNMTokens();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 6b77ceb..99760df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -352,11 +352,18 @@ public class FifoScheduler extends
         application.showRequests();
 
         LOG.debug("allocate:" +
-            " applicationId=" + applicationAttemptId + 
+            " applicationId=" + applicationAttemptId +
             " #ask=" + ask.size());
       }
 
-      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+      if (application.isWaitingForAMContainer(application.getApplicationId())) {
+        // Allocate is for AM and update AM blacklist for this
+        application.updateAMBlacklist(
+            blacklistAdditions, blacklistRemovals);
+      } else {
+        application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+      }
+
       ContainersAndNMTokensAllocation allocation =
           application.pullNewlyAllocatedContainersAndNMTokens();
       Resource headroom = application.getHeadroom();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 5080355..e464401 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -750,10 +750,7 @@ public class MockRM extends ResourceManager {
 
   public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
       throws Exception {
-    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
-    RMAppAttempt attempt = app.getCurrentAppAttempt();
-    waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
-    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
+    RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
     System.out.println("Launch AM " + attempt.getAppAttemptId());
     nm.nodeHeartbeat(true);
     MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
@@ -761,6 +758,15 @@ public class MockRM extends ResourceManager {
     return am;
   }
 
+  public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm)
+      throws Exception {
+    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
+    return attempt;
+  }
+
   public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
       throws Exception {
     MockAM am = launchAM(app, rm, nm);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index d579595..dc843b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -49,11 +53,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -82,21 +89,7 @@ public class TestAMRestart {
 
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
     int NUM_CONTAINERS = 3;
-    // allocate NUM_CONTAINERS containers
-    am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
-      new ArrayList<ContainerId>());
-    nm1.nodeHeartbeat(true);
-
-    // wait for containers to be allocated.
-    List<Container> containers =
-        am1.allocate(new ArrayList<ResourceRequest>(),
-          new ArrayList<ContainerId>()).getAllocatedContainers();
-    while (containers.size() != NUM_CONTAINERS) {
-      nm1.nodeHeartbeat(true);
-      containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
-        new ArrayList<ContainerId>()).getAllocatedContainers());
-      Thread.sleep(200);
-    }
+    allocateContainers(nm1, am1, NUM_CONTAINERS);
 
     // launch the 2nd container, for testing running container transferred.
     nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
@@ -244,6 +237,29 @@ public class TestAMRestart {
     rm1.stop();
   }
 
+  private List<Container> allocateContainers(MockNM nm1, MockAM am1,
+      int NUM_CONTAINERS) throws Exception {
+    // allocate NUM_CONTAINERS containers
+    am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
+      new ArrayList<ContainerId>());
+    nm1.nodeHeartbeat(true);
+
+    // wait for containers to be allocated.
+    List<Container> containers =
+        am1.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers();
+    while (containers.size() != NUM_CONTAINERS) {
+      nm1.nodeHeartbeat(true);
+      containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(200);
+    }
+
+    Assert.assertEquals("Did not get all containers allocated",
+        NUM_CONTAINERS, containers.size());
+    return containers;
+  }
+
   private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
       throws InterruptedException {
     int count = 0;
@@ -258,6 +274,9 @@ public class TestAMRestart {
   public void testNMTokensRebindOnAMRestart() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
+    // To prevent test from blacklisting nm1 for AM, we sit threshold to half
+    // of 2 nodes which is 1
+    conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 0.5f);
 
     MockRM rm1 = new MockRM(conf);
     rm1.start();
@@ -355,6 +374,106 @@ public class TestAMRestart {
     rm1.stop();
   }
 
+  @Test(timeout = 100000)
+  public void testAMBlacklistPreventsRestartOnSameNode() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm1 = new MockRM(conf, memStore) {
+      @Override
+      protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+        return new SchedulerEventDispatcher(this.scheduler) {
+          @Override
+          public void handle(SchedulerEvent event) {
+            scheduler.handle(event);
+          }
+        };
+      }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+
+    rm1.start();
+
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    MockNM nm2 =
+        new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
+    nm2.registerNode();
+
+    RMApp app1 = rm1.submitApp(200);
+
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm1.getResourceScheduler();
+    ContainerId amContainer =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    // Preempt the first attempt;
+    RMContainer rmContainer = scheduler.getRMContainer(amContainer);
+    NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
+
+    MockNM currentNode, otherNode;
+    if (nodeWhereAMRan == nm1.getNodeId()) {
+      currentNode = nm1;
+      otherNode = nm2;
+    } else {
+      currentNode = nm2;
+      otherNode = nm1;
+    }
+
+    ContainerStatus containerStatus =
+        BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
+            "", ContainerExitStatus.DISKS_FAILED);
+    currentNode.containerStatus(containerStatus);
+    am1.waitForState(RMAppAttemptState.FAILED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+    // restart the am
+    RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1);
+    System.out.println("Launch AM " + attempt.getAppAttemptId());
+
+
+
+    currentNode.nodeHeartbeat(true);
+    dispatcher.await();
+    Assert.assertEquals(
+        "AppAttemptState should still be SCHEDULED if currentNode is " +
+            "blacklisted correctly",
+        RMAppAttemptState.SCHEDULED,
+        attempt.getAppAttemptState());
+
+    otherNode.nodeHeartbeat(true);
+    dispatcher.await();
+
+    MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId());
+    rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+
+    amContainer =
+        ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
+    rmContainer = scheduler.getRMContainer(amContainer);
+    nodeWhereAMRan = rmContainer.getAllocatedNode();
+    Assert.assertEquals(
+        "After blacklisting AM should have run on the other node",
+        otherNode.getNodeId(), nodeWhereAMRan);
+
+    am2.registerAppAttempt();
+    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    List<Container> allocatedContainers =
+        allocateContainers(currentNode, am2, 1);
+    Assert.assertEquals(
+        "Even though AM is blacklisted from the node, application can still " +
+        "allocate containers there",
+        currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
+  }
+
   // AM container preempted, nm disk failure
   // should not be counted towards AM max retry count.
   @Test(timeout = 100000)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java
new file mode 100644
index 0000000..96b373f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TestBlacklistManager {
+
+  @Test
+  public void testSimpleBlacklistBelowFailureThreshold() {
+    final int numberOfNodeManagerHosts = 3;
+    final double blacklistDisableFailureThreshold = 0.8;
+    BlacklistManager manager = new SimpleBlacklistManager(
+        numberOfNodeManagerHosts, blacklistDisableFailureThreshold);
+    String anyNode = "foo";
+    String anyNode2 = "bar";
+    manager.addNode(anyNode);
+    manager.addNode(anyNode2);
+    BlacklistUpdates blacklist = manager
+        .getBlacklistUpdates();
+
+    List<String> blacklistAdditions = blacklist.getAdditions();
+    Collections.sort(blacklistAdditions);
+    List<String> blacklistRemovals = blacklist.getRemovals();
+    String[] expectedBlacklistAdditions = new String[]{anyNode2, anyNode};
+    Assert.assertArrayEquals(
+        "Blacklist additions was not as expected",
+        expectedBlacklistAdditions,
+        blacklistAdditions.toArray());
+    Assert.assertTrue(
+        "Blacklist removals should be empty but was " +
+            blacklistRemovals,
+        blacklistRemovals.isEmpty());
+  }
+
+  @Test
+  public void testSimpleBlacklistAboveFailureThreshold() {
+    // Create a threshold of 0.5 * 3 i.e at 1.5 node failures.
+    BlacklistManager manager = new SimpleBlacklistManager(3, 0.5);
+    String anyNode = "foo";
+    String anyNode2 = "bar";
+    manager.addNode(anyNode);
+    BlacklistUpdates blacklist = manager
+        .getBlacklistUpdates();
+
+    List<String> blacklistAdditions = blacklist.getAdditions();
+    Collections.sort(blacklistAdditions);
+    List<String> blacklistRemovals = blacklist.getRemovals();
+    String[] expectedBlacklistAdditions = new String[]{anyNode};
+    Assert.assertArrayEquals(
+        "Blacklist additions was not as expected",
+        expectedBlacklistAdditions,
+        blacklistAdditions.toArray());
+    Assert.assertTrue(
+        "Blacklist removals should be empty but was " +
+            blacklistRemovals,
+        blacklistRemovals.isEmpty());
+
+    manager.addNode(anyNode2);
+
+    blacklist = manager
+        .getBlacklistUpdates();
+    blacklistAdditions = blacklist.getAdditions();
+    Collections.sort(blacklistAdditions);
+    blacklistRemovals = blacklist.getRemovals();
+    Collections.sort(blacklistRemovals);
+    String[] expectedBlacklistRemovals = new String[] {anyNode2, anyNode};
+    Assert.assertTrue(
+        "Blacklist additions should be empty but was " +
+            blacklistAdditions,
+        blacklistAdditions.isEmpty());
+    Assert.assertArrayEquals(
+        "Blacklist removals was not as expected",
+        expectedBlacklistRemovals,
+        blacklistRemovals.toArray());
+  }
+
+  @Test
+  public void testDisabledBlacklist() {
+    BlacklistManager disabled = new DisabledBlacklistManager();
+    String anyNode = "foo";
+    disabled.addNode(anyNode);
+    BlacklistUpdates blacklist = disabled
+        .getBlacklistUpdates();
+
+    List<String> blacklistAdditions = blacklist.getAdditions();
+    List<String> blacklistRemovals = blacklist.getRemovals();
+    Assert.assertTrue(
+        "Blacklist additions should be empty but was " +
+            blacklistAdditions,
+        blacklistAdditions.isEmpty());
+    Assert.assertTrue(
+        "Blacklist removals should be empty but was " +
+            blacklistRemovals,
+        blacklistRemovals.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index fccfa19..484a1b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -489,7 +489,7 @@ public class TestRMAppLogAggregationStatus {
           2, Resource.newInstance(10, 2), "test");
     return new RMAppImpl(this.appId, this.rmContext,
       conf, "test", "test", "default", submissionContext,
-      this.rmContext.getScheduler(),
+      scheduler,
       this.rmContext.getApplicationMasterService(),
       System.currentTimeMillis(), "test",
       null, null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 2e64d61..a5e3308 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -970,7 +970,7 @@ public class TestRMAppTransitions {
             appState.getApplicationSubmissionContext().getApplicationId(),
             rmContext, conf,
             submissionContext.getApplicationName(), null,
-            submissionContext.getQueue(), submissionContext, null, null,
+            submissionContext.getQueue(), submissionContext, scheduler, null,
             appState.getSubmitTime(), submissionContext.getApplicationType(),
             submissionContext.getApplicationTags(),
             BuilderUtils.newResourceRequest(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 44773be..76a1351 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -655,6 +656,11 @@ public class TestCapacityScheduler {
     RMAppImpl app = mock(RMAppImpl.class);
     when(app.getApplicationId()).thenReturn(appId);
     RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext = mock(
+        ApplicationSubmissionContext.class);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
     when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
     when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
     when(app.getCurrentAppAttempt()).thenReturn(attempt);
@@ -715,6 +721,11 @@ public class TestCapacityScheduler {
     RMAppImpl app1 = mock(RMAppImpl.class);
     when(app1.getApplicationId()).thenReturn(appId1);
     RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt1.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext = mock(
+        ApplicationSubmissionContext.class);
+    when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
     when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
     when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
     when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
@@ -739,6 +750,8 @@ public class TestCapacityScheduler {
     RMAppImpl app2 = mock(RMAppImpl.class);
     when(app2.getApplicationId()).thenReturn(appId2);
     RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
+    when(attempt2.getMasterContainer()).thenReturn(container);
+    when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
     when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
     when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
     when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
@@ -2876,6 +2889,11 @@ public class TestCapacityScheduler {
     RMAppImpl app = mock(RMAppImpl.class);
     when(app.getApplicationId()).thenReturn(appId);
     RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext = mock(
+        ApplicationSubmissionContext.class);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
     when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
     when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
     when(app.getCurrentAppAttempt()).thenReturn(attempt);
@@ -2953,6 +2971,11 @@ public class TestCapacityScheduler {
     RMAppImpl app = mock(RMAppImpl.class);
     when(app.getApplicationId()).thenReturn(appId);
     RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext = mock(
+        ApplicationSubmissionContext.class);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
     when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
     when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
     when(app.getCurrentAppAttempt()).thenReturn(attempt);
@@ -2976,6 +2999,8 @@ public class TestCapacityScheduler {
     RMAppImpl app2 = mock(RMAppImpl.class);
     when(app2.getApplicationId()).thenReturn(appId2);
     RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
+    when(attempt2.getMasterContainer()).thenReturn(container);
+    when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
     when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
     when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
     when(app2.getCurrentAppAttempt()).thenReturn(attempt2);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81df7b58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 403c8ea..1c9801d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -220,7 +220,7 @@ public class FairSchedulerTestBase {
     ApplicationId appId = attId.getApplicationId();
     RMApp rmApp = new RMAppImpl(appId, rmContext, conf,
         null, user, null, ApplicationSubmissionContext.newInstance(appId, null,
-        queue, null, null, false, false, 0, amResource, null), null, null,
+        queue, null, null, false, false, 0, amResource, null), scheduler, null,
         0, null, null, null);
     rmContext.getRMApps().put(appId, rmApp);
     RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);