You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2019/04/11 18:50:07 UTC

[hadoop] branch trunk updated: YARN-9435. Add Opportunistic Scheduler metrics in ResourceManager. Contributed by Abhishek Modi.

This is an automated email from the ASF dual-hosted git repository.

gifuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed3747c  YARN-9435. Add Opportunistic Scheduler metrics in ResourceManager. Contributed by Abhishek Modi.
ed3747c is described below

commit ed3747c1ccc303e206de50c2b74f3c318cb1c416
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Thu Apr 11 11:49:19 2019 -0700

    YARN-9435. Add Opportunistic Scheduler metrics in ResourceManager. Contributed by Abhishek Modi.
---
 .../metrics/OpportunisticSchedulerMetrics.java     | 141 +++++++++++++++++++++
 .../scheduler/OpportunisticContainerAllocator.java |  14 ++
 .../OpportunisticContainerAllocatorAMService.java  |   4 +
 .../scheduler/AbstractYarnScheduler.java           |   2 +
 ...stOpportunisticContainerAllocatorAMService.java | 101 +++++++++++++++
 5 files changed, 262 insertions(+)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java
new file mode 100644
index 0000000..6169e51
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * Metrics for Opportunistic Scheduler.
+ */
+@InterfaceAudience.Private
+@Metrics(context="yarn")
+public class OpportunisticSchedulerMetrics {
+  // CHECKSTYLE:OFF:VisibilityModifier
+  private static AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+  private static final MetricsInfo RECORD_INFO =
+      info("OpportunisticSchedulerMetrics",
+          "Metrics for the Yarn Opportunistic Scheduler");
+
+  private static volatile OpportunisticSchedulerMetrics INSTANCE = null;
+  private static MetricsRegistry registry;
+
+  public static OpportunisticSchedulerMetrics getMetrics() {
+    if(!isInitialized.get()){
+      synchronized (OpportunisticSchedulerMetrics.class) {
+        if(INSTANCE == null){
+          INSTANCE = new OpportunisticSchedulerMetrics();
+          registerMetrics();
+          isInitialized.set(true);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  private static void registerMetrics() {
+    registry = new MetricsRegistry(RECORD_INFO);
+    registry.tag(RECORD_INFO, "ResourceManager");
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    if (ms != null) {
+      ms.register("OpportunisticSchedulerMetrics",
+          "Metrics for the Yarn Opportunistic Scheduler", INSTANCE);
+    }
+  }
+
+  @Metric("# of allocated opportunistic containers")
+  MutableGaugeInt allocatedOContainers;
+  @Metric("Aggregate # of allocated opportunistic containers")
+  MutableCounterLong aggregateOContainersAllocated;
+  @Metric("Aggregate # of released opportunistic containers")
+  MutableCounterLong aggregateOContainersReleased;
+
+  @Metric("Aggregate # of allocated node-local opportunistic containers")
+  MutableCounterLong aggregateNodeLocalOContainersAllocated;
+  @Metric("Aggregate # of allocated rack-local opportunistic containers")
+  MutableCounterLong aggregateRackLocalOContainersAllocated;
+  @Metric("Aggregate # of allocated off-switch opportunistic containers")
+  MutableCounterLong aggregateOffSwitchOContainersAllocated;
+
+  @VisibleForTesting
+  public int getAllocatedContainers() {
+    return allocatedOContainers.value();
+  }
+
+  @VisibleForTesting
+  public long getAggregatedAllocatedContainers() {
+    return aggregateOContainersAllocated.value();
+  }
+
+  @VisibleForTesting
+  public long getAggregatedReleasedContainers() {
+    return aggregateOContainersReleased.value();
+  }
+
+  @VisibleForTesting
+  public long getAggregatedNodeLocalContainers() {
+    return aggregateNodeLocalOContainersAllocated.value();
+  }
+
+  @VisibleForTesting
+  public long getAggregatedRackLocalContainers() {
+    return aggregateRackLocalOContainersAllocated.value();
+  }
+
+  @VisibleForTesting
+  public long getAggregatedOffSwitchContainers() {
+    return aggregateOffSwitchOContainersAllocated.value();
+  }
+
+  // Opportunistic Containers
+  public void incrAllocatedOppContainers(int numContainers) {
+    allocatedOContainers.incr(numContainers);
+    aggregateOContainersAllocated.incr(numContainers);
+  }
+
+  public void incrReleasedOppContainers(int numContainers) {
+    aggregateOContainersReleased.incr(numContainers);
+    allocatedOContainers.decr(numContainers);
+  }
+
+  public void incrNodeLocalOppContainers() {
+    aggregateNodeLocalOContainersAllocated.incr();
+  }
+
+  public void incrRackLocalOppContainers() {
+    aggregateRackLocalOContainersAllocated.incr();
+  }
+
+  public void incrOffSwitchOppContainers() {
+    aggregateOffSwitchOContainersAllocated.incr();
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 1cec3da..5600aa8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 
 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
 import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@@ -433,6 +434,7 @@ public class OpportunisticContainerAllocator {
             idCounter, id, userName, allocations, location,
             anyAsk, rNode);
         numAllocated++;
+        updateMetrics(loopIndex);
         // Try to spread the allocations across the nodes.
         // But don't add if it is a node local request.
         if (loopIndex != NODE_LOCAL_LOOP) {
@@ -459,6 +461,18 @@ public class OpportunisticContainerAllocator {
     }
   }
 
+  private void updateMetrics(int loopIndex) {
+    OpportunisticSchedulerMetrics metrics =
+        OpportunisticSchedulerMetrics.getMetrics();
+    if (loopIndex == NODE_LOCAL_LOOP) {
+      metrics.incrNodeLocalOppContainers();
+    } else if (loopIndex == RACK_LOCAL_LOOP) {
+      metrics.incrRackLocalOppContainers();
+    } else {
+      metrics.incrOffSwitchOppContainers();
+    }
+  }
+
   private Collection<RemoteNode> findNodeCandidates(int loopIndex,
       Map<String, RemoteNode> allNodes, Set<String> blackList,
       EnrichedResourceRequest enrichedRR) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index f6d23f7..9e861bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -200,6 +201,9 @@ public class OpportunisticContainerAllocatorAMService
 
       // Create RMContainers and update the NMTokens.
       if (!oppContainers.isEmpty()) {
+        OpportunisticSchedulerMetrics schedulerMetrics =
+            OpportunisticSchedulerMetrics.getMetrics();
+        schedulerMetrics.incrAllocatedOppContainers(oppContainers.size());
         handleNewContainers(oppContainers, false);
         appAttempt.updateNMTokens(oppContainers);
         ApplicationMasterServiceUtils.addToAllocatedContainers(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 5fd064b..da53530 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
@@ -695,6 +696,7 @@ public abstract class AbstractYarnScheduler
       if (node != null) {
         node.releaseContainer(rmContainer.getContainerId(), false);
       }
+      OpportunisticSchedulerMetrics.getMetrics().incrReleasedOppContainers(1);
     }
 
     // If the container is getting killed in ACQUIRED state, the requester (AM
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 221181d..9a8c02b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSche
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -753,6 +754,106 @@ public class TestOpportunisticContainerAllocatorAMService {
   }
 
   @Test(timeout = 60000)
+  public void testOpportunisticSchedulerMetrics() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    nm1.registerNode();
+    nm2.registerNode();
+    OpportunisticSchedulerMetrics metrics =
+        OpportunisticSchedulerMetrics.getMetrics();
+
+    int allocContainers = metrics.getAllocatedContainers();
+    long aggrAllocatedContainers = metrics.getAggregatedAllocatedContainers();
+    long aggrOffSwitchContainers = metrics.getAggregatedOffSwitchContainers();
+    long aggrReleasedContainers = metrics.getAggregatedReleasedContainers();
+
+    OpportunisticContainerAllocatorAMService amservice =
+        (OpportunisticContainerAllocatorAMService) rm
+            .getApplicationMasterService();
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    ApplicationAttemptId attemptId =
+        app1.getCurrentAppAttempt().getAppAttemptId();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    ((RMNodeImpl) rmNode1)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    ((RMNodeImpl) rmNode2)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+    // Send add and update node events to AM Service.
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // All nodes 1 to 2 will be applicable for scheduling.
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    AllocateResponse allocateResponse = am1.allocate(Arrays.asList(
+        ResourceRequest.newInstance(Priority.newInstance(1), "*",
+            Resources.createResource(1 * GB), 2, true, null,
+            ExecutionTypeRequest
+                .newInstance(ExecutionType.OPPORTUNISTIC, true))), null);
+
+    List<Container> allocatedContainers = allocateResponse
+        .getAllocatedContainers();
+    Assert.assertEquals(2, allocatedContainers.size());
+
+    Assert.assertEquals(allocContainers + 2, metrics.getAllocatedContainers());
+    Assert.assertEquals(aggrAllocatedContainers + 2,
+        metrics.getAggregatedAllocatedContainers());
+    Assert.assertEquals(aggrOffSwitchContainers + 2,
+        metrics.getAggregatedOffSwitchContainers());
+
+    Container container = allocatedContainers.get(0);
+    MockNM allocNode = nodes.get(container.getNodeId());
+
+    // Start Container in NM
+    allocNode.nodeHeartbeat(Arrays.asList(
+        ContainerStatus.newInstance(container.getId(),
+            ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
+        true);
+    rm.drainEvents();
+
+    // Verify that container is actually running wrt the RM..
+    RMContainer rmContainer = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(
+            container.getId().getApplicationAttemptId()).getRMContainer(
+            container.getId());
+    Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+    // Container Completed in the NM
+    allocNode.nodeHeartbeat(Arrays.asList(
+        ContainerStatus.newInstance(container.getId(),
+            ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)),
+        true);
+    rm.drainEvents();
+
+    // Verify that container has been removed..
+    rmContainer = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(
+            container.getId().getApplicationAttemptId()).getRMContainer(
+            container.getId());
+    Assert.assertNull(rmContainer);
+
+    Assert.assertEquals(allocContainers + 1, metrics.getAllocatedContainers());
+    Assert.assertEquals(aggrReleasedContainers + 1,
+        metrics.getAggregatedReleasedContainers());
+  }
+
+  @Test(timeout = 60000)
   public void testAMCrashDuringAllocate() throws Exception {
     MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
     nm.registerNode();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org