You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/06/05 18:41:59 UTC

hadoop git commit: YARN-3259. FairScheduler: Trigger fairShare updates on node events. (Anubhav Dhoot via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 6786daab3 -> 75885852c


YARN-3259. FairScheduler: Trigger fairShare updates on node events. (Anubhav Dhoot via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/75885852
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/75885852
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/75885852

Branch: refs/heads/trunk
Commit: 75885852cc19dd6de12e62498b112d5d70ce87f4
Parents: 6786daa
Author: Karthik Kambatla <ka...@apache.org>
Authored: Fri Jun 5 09:39:41 2015 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Fri Jun 5 09:39:41 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/fair/FSOpDurations.java           |   6 +
 .../scheduler/fair/FairScheduler.java           |  23 +++-
 .../scheduler/fair/TestSchedulingUpdate.java    | 135 +++++++++++++++++++
 4 files changed, 163 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/75885852/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8b922e2..d802703 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -306,6 +306,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3547. FairScheduler: Apps that have no resource demand should not participate 
     scheduling. (Xianyin Xin via kasha)
 
+    YARN-3259. FairScheduler: Trigger fairShare updates on node events. 
+    (Anubhav Dhoot via kasha)
+
   BUG FIXES
 
     YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/75885852/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java
index c2282fd..20d2af9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.MetricsCollector;
@@ -116,4 +117,9 @@ public class FSOpDurations implements MetricsSource {
   public void addPreemptCallDuration(long value) {
     preemptCall.add(value);
   }
+
+  @VisibleForTesting
+  public boolean hasUpdateThreadRunChanged() {
+    return updateThreadRun.changed();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/75885852/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 07b3271..64b3f12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -103,9 +103,9 @@ import com.google.common.base.Preconditions;
  * of the root queue in the typical fair scheduling fashion. Then, the children
  * distribute the resources assigned to them to their children in the same
  * fashion.  Applications may only be scheduled on leaf queues. Queues can be
- * specified as children of other queues by placing them as sub-elements of their
- * parents in the fair scheduler configuration file.
- * 
+ * specified as children of other queues by placing them as sub-elements of
+ * their parents in the fair scheduler configuration file.
+ *
  * A queue's name starts with the names of its parents, with periods as
  * separators.  So a queue named "queue1" under the root named, would be 
  * referred to as "root.queue1", and a queue named "queue2" under a queue
@@ -142,6 +142,8 @@ public class FairScheduler extends
   @VisibleForTesting
   Thread updateThread;
 
+  private final Object updateThreadMonitor = new Object();
+
   @VisibleForTesting
   Thread schedulingThread;
   // timeout to join when we stop this service
@@ -246,6 +248,13 @@ public class FairScheduler extends
     return queueMgr;
   }
 
+  // Allows UpdateThread to start processing without waiting till updateInterval
+  void triggerUpdate() {
+    synchronized (updateThreadMonitor) {
+      updateThreadMonitor.notify();
+    }
+  }
+
   /**
    * Thread which calls {@link FairScheduler#update()} every
    * <code>updateInterval</code> milliseconds.
@@ -256,7 +265,9 @@ public class FairScheduler extends
     public void run() {
       while (!Thread.currentThread().isInterrupted()) {
         try {
-          Thread.sleep(updateInterval);
+          synchronized (updateThreadMonitor) {
+            updateThreadMonitor.wait(updateInterval);
+          }
           long start = getClock().getTime();
           update();
           preemptTasksIfNecessary();
@@ -838,6 +849,8 @@ public class FairScheduler extends
     updateRootQueueMetrics();
     updateMaximumAllocation(schedulerNode, true);
 
+    triggerUpdate();
+
     queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
     queueMgr.getRootQueue().recomputeSteadyShares();
     LOG.info("Added node " + node.getNodeAddress() +
@@ -853,6 +866,8 @@ public class FairScheduler extends
     Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
     updateRootQueueMetrics();
 
+    triggerUpdate();
+
     // Remove running containers
     List<RMContainer> runningContainers = node.getRunningContainers();
     for (RMContainer container : runningContainers) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/75885852/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java
new file mode 100644
index 0000000..94298f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestSchedulingUpdate extends FairSchedulerTestBase {
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+
+    // Make the update loop to never finish to ensure zero update calls
+    conf.setInt(
+        FairSchedulerConfiguration.UPDATE_INTERVAL_MS,
+        Integer.MAX_VALUE);
+    return conf;
+  }
+
+  @Before
+  public void setup() {
+    conf = createConfiguration();
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+  }
+
+  @After
+  public void teardown() {
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+  }
+
+  @Test (timeout = 3000)
+  public void testSchedulingUpdateOnNodeJoinLeave() throws InterruptedException {
+
+    verifyNoCalls();
+
+    // Add one node
+    String host = "127.0.0.1";
+    final int memory = 4096;
+    final int cores = 4;
+    RMNode node1 = MockNodes.newNodeInfo(
+        1, Resources.createResource(memory, cores), 1, host);
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    long expectedCalls = 1;
+    verifyExpectedCalls(expectedCalls, memory, cores);
+
+    // Remove the node
+    NodeRemovedSchedulerEvent nodeEvent2 = new NodeRemovedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent2);
+
+    expectedCalls = 2;
+    verifyExpectedCalls(expectedCalls, 0, 0);
+  }
+
+  private void verifyExpectedCalls(long expectedCalls, int memory, int vcores)
+    throws InterruptedException {
+    boolean verified = false;
+    int count = 0;
+    while (count < 100) {
+      if (scheduler.fsOpDurations.hasUpdateThreadRunChanged()) {
+        break;
+      }
+      count++;
+      Thread.sleep(10);
+    }
+    assertTrue("Update Thread has not run based on its metrics",
+        scheduler.fsOpDurations.hasUpdateThreadRunChanged());
+    assertEquals("Root queue metrics memory does not have expected value",
+        memory, scheduler.getRootQueueMetrics().getAvailableMB());
+    assertEquals("Root queue metrics cpu does not have expected value",
+        vcores, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
+
+    MetricsCollectorImpl collector = new MetricsCollectorImpl();
+    scheduler.fsOpDurations.getMetrics(collector, true);
+    MetricsRecord record = collector.getRecords().get(0);
+    for (AbstractMetric abstractMetric : record.metrics()) {
+      if (abstractMetric.name().contains("UpdateThreadRunNumOps")) {
+        assertEquals("Update Thread did not run expected number of times " +
+                "based on metric record count",
+            expectedCalls,
+            abstractMetric.value());
+        verified = true;
+      }
+    }
+    assertTrue("Did not find metric for UpdateThreadRunNumOps", verified);
+  }
+
+  private void verifyNoCalls() {
+    assertFalse("Update thread should not have executed",
+        scheduler.fsOpDurations.hasUpdateThreadRunChanged());
+    assertEquals("Scheduler queue memory should not have been updated",
+        0, scheduler.getRootQueueMetrics().getAvailableMB());
+    assertEquals("Scheduler queue cpu should not have been updated",
+        0,scheduler.getRootQueueMetrics().getAvailableVirtualCores());
+  }
+}