You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/12/07 00:53:57 UTC

[12/50] [abbrv] hadoop git commit: YARN-6124. Make SchedulingEditPolicy can be enabled / disabled / updated with RMAdmin -refreshQueues. (Zian Chen via wangda)

YARN-6124. Make SchedulingEditPolicy can be enabled / disabled / updated with RMAdmin -refreshQueues. (Zian Chen via wangda)

Change-Id: Id93656f3af7dcd78cafa94e33663c78d410d43c2


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a63d19d3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a63d19d3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a63d19d3

Branch: refs/heads/HDFS-7240
Commit: a63d19d36520fa55bf523483f14329756f6eadd3
Parents: 0780fdb
Author: Wangda Tan <wa...@apache.org>
Authored: Thu Nov 30 15:56:53 2017 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Thu Nov 30 15:57:22 2017 -0800

----------------------------------------------------------------------
 .../server/resourcemanager/AdminService.java    |  21 ++-
 .../server/resourcemanager/ResourceManager.java |  31 +---
 .../monitor/SchedulingMonitor.java              |   3 +-
 .../monitor/SchedulingMonitorManager.java       | 184 +++++++++++++++++++
 .../scheduler/AbstractYarnScheduler.java        |  25 ++-
 .../scheduler/capacity/CapacityScheduler.java   |   6 +
 .../scheduler/fair/FairScheduler.java           |   6 +
 .../scheduler/fifo/FifoScheduler.java           |   6 +
 .../server/resourcemanager/RMHATestBase.java    |  30 ++-
 .../monitor/TestSchedulingMonitor.java          |  41 +++++
 ...estProportionalCapacityPreemptionPolicy.java |  22 ++-
 .../TestCapacitySchedulerLazyPreemption.java    |  36 +++-
 ...TestCapacitySchedulerSurgicalPreemption.java |  40 +++-
 13 files changed, 391 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 6c0a854..accf901 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -400,14 +400,31 @@ public class AdminService extends CompositeService implements
     }
   }
 
+  protected Configuration loadNewConfiguration()
+      throws IOException, YarnException {
+    // Retrieve yarn-site.xml in order to refresh scheduling monitor properties.
+    Configuration conf = getConfiguration(new Configuration(false),
+        YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+    // The reason we call Configuration#size() is because when getConfiguration
+    // been called, it invokes Configuration#addResouce, which invokes
+    // Configuration#reloadConfiguration which triggers the reload process in a
+    // lazy way, the properties will only be reload when it's needed rather than
+    // reload it right after getConfiguration been called. So here we call
+    // Configuration#size() to force the Configuration#getProps been called to
+    // reload all the properties.
+    conf.size();
+    return conf;
+  }
+
   @Private
   public void refreshQueues() throws IOException, YarnException {
-    rm.getRMContext().getScheduler().reinitialize(getConfig(),
+    Configuration conf = loadNewConfiguration();
+    rm.getRMContext().getScheduler().reinitialize(conf,
         this.rm.getRMContext());
     // refresh the reservation system
     ReservationSystem rSystem = rm.getRMContext().getReservationSystem();
     if (rSystem != null) {
-      rSystem.reinitialize(getConfig(), rm.getRMContext());
+      rSystem.reinitialize(conf, rm.getRMContext());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 6f8a0a4..a0317f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.curator.framework.AuthInfo;
@@ -67,8 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPub
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
@@ -113,8 +112,6 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 import org.eclipse.jetty.webapp.WebAppContext;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
@@ -711,8 +708,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
         }
       }
 
-      createSchedulerMonitors();
-
       masterService = createApplicationMasterService();
       addService(masterService) ;
       rmContext.setApplicationMasterService(masterService);
@@ -811,30 +806,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
       }
 
     }
-
-    protected void createSchedulerMonitors() {
-      if (conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
-        LOG.info("Loading policy monitors");
-        List<SchedulingEditPolicy> policies = conf.getInstances(
-            YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
-            SchedulingEditPolicy.class);
-        if (policies.size() > 0) {
-          for (SchedulingEditPolicy policy : policies) {
-            LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
-            // periodically check whether we need to take action to guarantee
-            // constraints
-            SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
-            addService(mon);
-          }
-        } else {
-          LOG.warn("Policy monitors configured (" +
-              YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS +
-              ") but none specified (" +
-              YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")");
-        }
-      }
-    }
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
index 2a741ed..09edb98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -58,6 +57,7 @@ public class SchedulingMonitor extends AbstractService {
   }
 
   public void serviceInit(Configuration conf) throws Exception {
+    LOG.info("Initializing SchedulingMonitor=" + getName());
     scheduleEditPolicy.init(conf, rmContext, rmContext.getScheduler());
     this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
     super.serviceInit(conf);
@@ -65,6 +65,7 @@ public class SchedulingMonitor extends AbstractService {
 
   @Override
   public void serviceStart() throws Exception {
+    LOG.info("Starting SchedulingMonitor=" + getName());
     assert !stopped : "starting when already stopped";
     ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
       public Thread newThread(Runnable r) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java
new file mode 100644
index 0000000..0cc700d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Manages scheduling monitors.
+ */
+public class SchedulingMonitorManager {
+  private static final Log LOG = LogFactory.getLog(
+      SchedulingMonitorManager.class);
+
+  private Map<String, SchedulingMonitor> runningSchedulingMonitors =
+      new HashMap<>();
+  private RMContext rmContext;
+
+  private void updateSchedulingMonitors(Configuration conf,
+      boolean startImmediately) throws YarnException {
+    boolean monitorsEnabled = conf.getBoolean(
+        YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
+
+    if (!monitorsEnabled) {
+      if (!runningSchedulingMonitors.isEmpty()) {
+        // If monitors disabled while we have some running monitors, we should
+        // stop them.
+        LOG.info("Scheduling Monitor disabled, stopping all services");
+        stopAndRemoveAll();
+      }
+
+      return;
+    }
+
+    // When monitor is enabled, loading policies
+    String[] configuredPolicies = conf.getStrings(
+        YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES);
+    if (configuredPolicies == null || configuredPolicies.length == 0) {
+      return;
+    }
+
+    Set<String> configurePoliciesSet = new HashSet<>();
+    for (String s : configuredPolicies) {
+      configurePoliciesSet.add(s);
+    }
+
+    // Add new monitor when needed
+    for (String s : configurePoliciesSet) {
+      if (!runningSchedulingMonitors.containsKey(s)) {
+        Class<?> policyClass;
+        try {
+          policyClass = Class.forName(s);
+        } catch (ClassNotFoundException e) {
+          String message = "Failed to find class of specified policy=" + s;
+          LOG.warn(message);
+          throw new YarnException(message);
+        }
+
+        if (SchedulingEditPolicy.class.isAssignableFrom(policyClass)) {
+          SchedulingEditPolicy policyInstance =
+              (SchedulingEditPolicy) ReflectionUtils.newInstance(policyClass,
+                  null);
+          SchedulingMonitor mon = new SchedulingMonitor(rmContext,
+              policyInstance);
+          mon.init(conf);
+          if (startImmediately) {
+            mon.start();
+          }
+          runningSchedulingMonitors.put(s, mon);
+        } else {
+          String message =
+              "Specified policy=" + s + " is not a SchedulingEditPolicy class.";
+          LOG.warn(message);
+          throw new YarnException(message);
+        }
+      }
+    }
+
+    // Stop monitor when needed.
+    Set<String> disabledPolicies = Sets.difference(
+        runningSchedulingMonitors.keySet(), configurePoliciesSet);
+    for (String disabledPolicy : disabledPolicies) {
+      LOG.info("SchedulingEditPolicy=" + disabledPolicy
+          + " removed, stopping it now ...");
+      silentlyStopSchedulingMonitor(disabledPolicy);
+      runningSchedulingMonitors.remove(disabledPolicy);
+    }
+  }
+
+  public synchronized void initialize(RMContext rmContext,
+      Configuration configuration) throws YarnException {
+    this.rmContext = rmContext;
+    stopAndRemoveAll();
+
+    updateSchedulingMonitors(configuration, false);
+  }
+
+  public synchronized void reinitialize(RMContext rmContext,
+      Configuration configuration) throws YarnException {
+    this.rmContext = rmContext;
+
+    updateSchedulingMonitors(configuration, true);
+  }
+
+  public synchronized void startAll() {
+    for (SchedulingMonitor schedulingMonitor : runningSchedulingMonitors
+        .values()) {
+      schedulingMonitor.start();
+    }
+  }
+
+  private void silentlyStopSchedulingMonitor(String name) {
+    SchedulingMonitor mon = runningSchedulingMonitors.get(name);
+    try {
+      mon.stop();
+      LOG.info("Sucessfully stopped monitor=" + mon.getName());
+    } catch (Exception e) {
+      LOG.warn("Exception while stopping monitor=" + mon.getName(), e);
+    }
+  }
+
+  private void stopAndRemoveAll() {
+    if (!runningSchedulingMonitors.isEmpty()) {
+      for (String schedulingMonitorName : runningSchedulingMonitors
+          .keySet()) {
+        silentlyStopSchedulingMonitor(schedulingMonitorName);
+      }
+      runningSchedulingMonitors.clear();
+    }
+  }
+
+  public boolean isRSMEmpty() {
+    return runningSchedulingMonitors.isEmpty();
+  }
+
+  public boolean isSameConfiguredPolicies(Set<String> configurePoliciesSet) {
+    return configurePoliciesSet.equals(runningSchedulingMonitors.keySet());
+  }
+
+  public SchedulingMonitor getAvailableSchedulingMonitor() {
+    if (isRSMEmpty()) {
+      return null;
+    }
+    for (SchedulingMonitor smon : runningSchedulingMonitors.values()) {
+      if (smon.getSchedulingEditPolicy()
+          instanceof ProportionalCapacityPreemptionPolicy) {
+        return smon;
+      }
+    }
+    return null;
+  }
+
+  public synchronized void stop() throws YarnException {
+    stopAndRemoveAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index e818dab..4749c3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -168,6 +169,8 @@ public abstract class AbstractYarnScheduler
   // the NM in the next heartbeat.
   private boolean autoUpdateContainers = false;
 
+  protected SchedulingMonitorManager schedulingMonitorManager;
+
   /**
    * Construct the service.
    *
@@ -207,8 +210,8 @@ public abstract class AbstractYarnScheduler
           new RMCriticalThreadUncaughtExceptionHandler(rmContext));
       updateThread.setDaemon(true);
     }
-
     super.serviceInit(conf);
+
   }
 
   @Override
@@ -216,6 +219,7 @@ public abstract class AbstractYarnScheduler
     if (updateThread != null) {
       updateThread.start();
     }
+    schedulingMonitorManager.startAll();
     super.serviceStart();
   }
 
@@ -225,6 +229,9 @@ public abstract class AbstractYarnScheduler
       updateThread.interrupt();
       updateThread.join(THREAD_JOIN_TIMEOUT_MS);
     }
+    if (schedulingMonitorManager != null) {
+      schedulingMonitorManager.stop();
+    }
     super.serviceStop();
   }
 
@@ -233,6 +240,11 @@ public abstract class AbstractYarnScheduler
     return nodeTracker;
   }
 
+  @VisibleForTesting
+  public SchedulingMonitorManager getSchedulingMonitorManager() {
+    return schedulingMonitorManager;
+  }
+
   /*
    * YARN-3136 removed synchronized lock for this method for performance
    * purposes
@@ -1415,4 +1427,15 @@ public abstract class AbstractYarnScheduler
       updateThreadMonitor.notify();
     }
   }
+
+  @Override
+  public void reinitialize(Configuration conf, RMContext rmContext)
+      throws IOException {
+    try {
+      LOG.info("Reinitializing SchedulingMonitorManager ...");
+      schedulingMonitorManager.reinitialize(rmContext, conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 218adf3..de93a6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
@@ -390,6 +391,9 @@ public class CapacityScheduler extends
     Configuration configuration = new Configuration(conf);
     super.serviceInit(conf);
     initScheduler(configuration);
+    // Initialize SchedulingMonitorManager
+    schedulingMonitorManager = new SchedulingMonitorManager();
+    schedulingMonitorManager.initialize(rmContext, conf);
   }
 
   @Override
@@ -444,6 +448,8 @@ public class CapacityScheduler extends
 
       // Setup how many containers we can allocate for each round
       offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
+
+      super.reinitialize(newConf, rmContext);
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 625009d..ebc7222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -1352,6 +1353,10 @@ public class FairScheduler extends
   public void serviceInit(Configuration conf) throws Exception {
     initScheduler(conf);
     super.serviceInit(conf);
+
+    // Initialize SchedulingMonitorManager
+    schedulingMonitorManager = new SchedulingMonitorManager();
+    schedulingMonitorManager.initialize(rmContext, conf);
   }
 
   @Override
@@ -1389,6 +1394,7 @@ public class FairScheduler extends
       throws IOException {
     try {
       allocsLoader.reloadAllocations();
+      super.reinitialize(conf, rmContext);
     } catch (Exception e) {
       LOG.error("Failed to reload allocations file", e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 3288912..826575d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -255,6 +256,10 @@ public class FifoScheduler extends
   public void serviceInit(Configuration conf) throws Exception {
     initScheduler(conf);
     super.serviceInit(conf);
+
+    // Initialize SchedulingMonitorManager
+    schedulingMonitorManager = new SchedulingMonitorManager();
+    schedulingMonitorManager.initialize(rmContext, conf);
   }
 
   @Override
@@ -312,6 +317,7 @@ public class FifoScheduler extends
       reinitialize(Configuration conf, RMContext rmContext) throws IOException
   {
     setConf(conf);
+    super.reinitialize(conf, rmContext);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
index 4ac4fc3..439a449 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -105,9 +105,35 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
     return am;
   }
 
+  private MockRM initMockRMWithOldConf(Configuration confForRM1) {
+    return new MockRM(confForRM1, null, false, false) {
+      @Override
+      protected AdminService createAdminService() {
+        return new AdminService(this) {
+          @Override
+          protected void startServer() {
+            // override to not start rpc handler
+          }
+
+          @Override
+          protected void stopServer() {
+            // don't do anything
+          }
+
+          @Override
+          protected Configuration loadNewConfiguration()
+              throws IOException, YarnException {
+            return confForRM1;
+          }
+        };
+      }
+    };
+  }
+
   protected void startRMs() throws IOException {
-    rm1 = new MockRM(confForRM1, null, false, false);
-    rm2 = new MockRM(confForRM2, null, false, false);
+    rm1 = initMockRMWithOldConf(confForRM1);
+    rm2 = initMockRMWithOldConf(confForRM2);
+
     startRMs(rm1, confForRM1, rm2, confForRM2);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java
index c38236d..84126c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java
@@ -23,8 +23,15 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.Test;
 
+import java.util.HashSet;
+import java.util.Set;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
@@ -51,4 +58,38 @@ public class TestSchedulingMonitor {
     monitor.close();
     rm.close();
   }
+
+  @Test(timeout = 10000)
+  public void testRMUpdateSchedulingEditPolicy() throws Exception {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    SchedulingMonitorManager smm = cs.getSchedulingMonitorManager();
+
+    // runningSchedulingMonitors should not be empty when initialize RM
+    // scheduler monitor
+    cs.reinitialize(conf, rm.getRMContext());
+    assertFalse(smm.isRSMEmpty());
+
+    // make sure runningSchedulingPolicies contains all the configured policy
+    // in YARNConfiguration
+    String[] configuredPolicies = conf.getStrings(
+        YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES);
+    Set<String> configurePoliciesSet = new HashSet<>();
+    for (String s : configuredPolicies) {
+      configurePoliciesSet.add(s);
+    }
+    assertTrue(smm.isSameConfiguredPolicies(configurePoliciesSet));
+
+    // disable RM scheduler monitor
+    conf.setBoolean(
+        YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
+    cs.reinitialize(conf, rm.getRMContext());
+    assertTrue(smm.isRSMEmpty());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 694be09..f0ca466 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
@@ -48,7 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
@@ -792,21 +792,23 @@ public class TestProportionalCapacityPreemptionPolicy {
     @SuppressWarnings("resource")
     MockRM rm = new MockRM(conf);
     rm.init(conf);
-    
+
     // ProportionalCapacityPreemptionPolicy should be initialized after
     // CapacityScheduler initialized. We will 
     // 1) find SchedulingMonitor from RMActiveService's service list, 
     // 2) check if ResourceCalculator in policy is null or not. 
     // If it's not null, we can come to a conclusion that policy initialized
     // after scheduler got initialized
-    for (Service service : rm.getRMActiveService().getServices()) {
-      if (service instanceof SchedulingMonitor) {
-        ProportionalCapacityPreemptionPolicy policy =
-            (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
-                .getSchedulingEditPolicy();
-        assertNotNull(policy.getResourceCalculator());
-        return;
-      }
+    // Get SchedulingMonitor from SchedulingMonitorManager instead
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    SchedulingMonitorManager smm = cs.getSchedulingMonitorManager();
+    Service service = smm.getAvailableSchedulingMonitor();
+    if (service instanceof SchedulingMonitor) {
+      ProportionalCapacityPreemptionPolicy policy =
+          (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
+              .getSchedulingEditPolicy();
+      assertNotNull(policy.getResourceCalculator());
+      return;
     }
     
     fail("Failed to find SchedulingMonitor service, please check what happened");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
index 4e4e3c2..a4c7d61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -126,7 +128,11 @@ public class TestCapacitySchedulerLazyPreemption
             Resources.createResource(1 * GB), 1)), null);
 
     // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
 
     // Call edit schedule twice, and check if one container from app1 marked
     // to be "killable"
@@ -209,7 +215,11 @@ public class TestCapacitySchedulerLazyPreemption
             Resources.createResource(1 * GB), 1)), null);
 
     // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
 
     // Call edit schedule twice, and check if one container from app1 marked
     // to be "killable"
@@ -301,7 +311,11 @@ public class TestCapacitySchedulerLazyPreemption
             Resources.createResource(1 * GB), 1, false)), null);
 
     // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
 
     // Call edit schedule twice, and check if one container from app1 marked
     // to be "killable"
@@ -387,8 +401,11 @@ public class TestCapacitySchedulerLazyPreemption
     am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
 
     // Get edit policy and do one update
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
     ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
 
     // Call edit schedule twice, and check if one container from app1 marked
     // to be "killable"
@@ -487,8 +504,11 @@ public class TestCapacitySchedulerLazyPreemption
     am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
 
     // Get edit policy and do one update
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
     ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
 
     // Call edit schedule twice, and check if 3 container from app1 marked
     // to be "killable"
@@ -582,7 +602,11 @@ public class TestCapacitySchedulerLazyPreemption
             Resources.createResource(1 * GB), 1)), null);
 
     // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
 
     // Call edit schedule twice, and check if no container from app1 marked
     // to be "killable"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a63d19d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
index 9146373..8a7e03f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -26,7 +26,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -138,7 +139,11 @@ public class TestCapacitySchedulerSurgicalPreemption
     Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
 
     // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
 
     // Call edit schedule twice, and check if 4 containers from app1 at n1 killed
     editPolicy.editSchedule();
@@ -217,8 +222,11 @@ public class TestCapacitySchedulerSurgicalPreemption
         ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
 
     // Call editSchedule: containers are selected to be preemption candidate
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
     ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
     editPolicy.editSchedule();
     Assert.assertEquals(3, editPolicy.getToPreemptContainers().size());
 
@@ -323,8 +331,11 @@ public class TestCapacitySchedulerSurgicalPreemption
     }
 
     // Call editSchedule immediately: containers are not selected
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
     ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
     editPolicy.editSchedule();
     Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
 
@@ -434,8 +445,11 @@ public class TestCapacitySchedulerSurgicalPreemption
         cs.getNode(rmNode3.getNodeID()).getReservedContainer());
 
     // Call editSchedule immediately: nothing happens
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
     ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
     editPolicy.editSchedule();
     Assert.assertNotNull(
         cs.getNode(rmNode3.getNodeID()).getReservedContainer());
@@ -562,8 +576,11 @@ public class TestCapacitySchedulerSurgicalPreemption
     // 6 (selected) + 1 (allocated) which makes target capacity to 70%
     Thread.sleep(1000);
 
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
     ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
     editPolicy.editSchedule();
     checkNumberOfPreemptionCandidateFromApp(editPolicy, 6,
         am1.getApplicationAttemptId());
@@ -715,8 +732,11 @@ public class TestCapacitySchedulerSurgicalPreemption
     Thread.sleep(1000);
 
     /* 1st container preempted is on n2 */
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
     ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
     editPolicy.editSchedule();
 
     // We should have one to-preempt container, on node[2]
@@ -887,7 +907,11 @@ public class TestCapacitySchedulerSurgicalPreemption
     waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
 
     // Call editSchedule twice and allocation once, container should get allocated
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
     editPolicy.editSchedule();
     editPolicy.editSchedule();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org