You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2019/04/11 18:50:07 UTC
[hadoop] branch trunk updated: YARN-9435. Add Opportunistic
Scheduler metrics in ResourceManager. Contributed by Abhishek Modi.
This is an automated email from the ASF dual-hosted git repository.
gifuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new ed3747c YARN-9435. Add Opportunistic Scheduler metrics in ResourceManager. Contributed by Abhishek Modi.
ed3747c is described below
commit ed3747c1ccc303e206de50c2b74f3c318cb1c416
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Thu Apr 11 11:49:19 2019 -0700
YARN-9435. Add Opportunistic Scheduler metrics in ResourceManager. Contributed by Abhishek Modi.
---
.../metrics/OpportunisticSchedulerMetrics.java | 141 +++++++++++++++++++++
.../scheduler/OpportunisticContainerAllocator.java | 14 ++
.../OpportunisticContainerAllocatorAMService.java | 4 +
.../scheduler/AbstractYarnScheduler.java | 2 +
...stOpportunisticContainerAllocatorAMService.java | 101 +++++++++++++++
5 files changed, 262 insertions(+)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java
new file mode 100644
index 0000000..6169e51
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * Metrics for Opportunistic Scheduler.
+ */
+@InterfaceAudience.Private
+@Metrics(context="yarn")
+public class OpportunisticSchedulerMetrics {
+ // CHECKSTYLE:OFF:VisibilityModifier
+ private static AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+ private static final MetricsInfo RECORD_INFO =
+ info("OpportunisticSchedulerMetrics",
+ "Metrics for the Yarn Opportunistic Scheduler");
+
+ private static volatile OpportunisticSchedulerMetrics INSTANCE = null;
+ private static MetricsRegistry registry;
+
+ public static OpportunisticSchedulerMetrics getMetrics() {
+ if(!isInitialized.get()){
+ synchronized (OpportunisticSchedulerMetrics.class) {
+ if(INSTANCE == null){
+ INSTANCE = new OpportunisticSchedulerMetrics();
+ registerMetrics();
+ isInitialized.set(true);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ private static void registerMetrics() {
+ registry = new MetricsRegistry(RECORD_INFO);
+ registry.tag(RECORD_INFO, "ResourceManager");
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ if (ms != null) {
+ ms.register("OpportunisticSchedulerMetrics",
+ "Metrics for the Yarn Opportunistic Scheduler", INSTANCE);
+ }
+ }
+
+ @Metric("# of allocated opportunistic containers")
+ MutableGaugeInt allocatedOContainers;
+ @Metric("Aggregate # of allocated opportunistic containers")
+ MutableCounterLong aggregateOContainersAllocated;
+ @Metric("Aggregate # of released opportunistic containers")
+ MutableCounterLong aggregateOContainersReleased;
+
+ @Metric("Aggregate # of allocated node-local opportunistic containers")
+ MutableCounterLong aggregateNodeLocalOContainersAllocated;
+ @Metric("Aggregate # of allocated rack-local opportunistic containers")
+ MutableCounterLong aggregateRackLocalOContainersAllocated;
+ @Metric("Aggregate # of allocated off-switch opportunistic containers")
+ MutableCounterLong aggregateOffSwitchOContainersAllocated;
+
+ @VisibleForTesting
+ public int getAllocatedContainers() {
+ return allocatedOContainers.value();
+ }
+
+ @VisibleForTesting
+ public long getAggregatedAllocatedContainers() {
+ return aggregateOContainersAllocated.value();
+ }
+
+ @VisibleForTesting
+ public long getAggregatedReleasedContainers() {
+ return aggregateOContainersReleased.value();
+ }
+
+ @VisibleForTesting
+ public long getAggregatedNodeLocalContainers() {
+ return aggregateNodeLocalOContainersAllocated.value();
+ }
+
+ @VisibleForTesting
+ public long getAggregatedRackLocalContainers() {
+ return aggregateRackLocalOContainersAllocated.value();
+ }
+
+ @VisibleForTesting
+ public long getAggregatedOffSwitchContainers() {
+ return aggregateOffSwitchOContainersAllocated.value();
+ }
+
+ // Opportunistic Containers
+ public void incrAllocatedOppContainers(int numContainers) {
+ allocatedOContainers.incr(numContainers);
+ aggregateOContainersAllocated.incr(numContainers);
+ }
+
+ public void incrReleasedOppContainers(int numContainers) {
+ aggregateOContainersReleased.incr(numContainers);
+ allocatedOContainers.decr(numContainers);
+ }
+
+ public void incrNodeLocalOppContainers() {
+ aggregateNodeLocalOContainersAllocated.incr();
+ }
+
+ public void incrRackLocalOppContainers() {
+ aggregateRackLocalOContainersAllocated.incr();
+ }
+
+ public void incrOffSwitchOppContainers() {
+ aggregateOffSwitchOContainersAllocated.incr();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 1cec3da..5600aa8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@@ -433,6 +434,7 @@ public class OpportunisticContainerAllocator {
idCounter, id, userName, allocations, location,
anyAsk, rNode);
numAllocated++;
+ updateMetrics(loopIndex);
// Try to spread the allocations across the nodes.
// But don't add if it is a node local request.
if (loopIndex != NODE_LOCAL_LOOP) {
@@ -459,6 +461,18 @@ public class OpportunisticContainerAllocator {
}
}
+ private void updateMetrics(int loopIndex) {
+ OpportunisticSchedulerMetrics metrics =
+ OpportunisticSchedulerMetrics.getMetrics();
+ if (loopIndex == NODE_LOCAL_LOOP) {
+ metrics.incrNodeLocalOppContainers();
+ } else if (loopIndex == RACK_LOCAL_LOOP) {
+ metrics.incrRackLocalOppContainers();
+ } else {
+ metrics.incrOffSwitchOppContainers();
+ }
+ }
+
private Collection<RemoteNode> findNodeCandidates(int loopIndex,
Map<String, RemoteNode> allNodes, Set<String> blackList,
EnrichedResourceRequest enrichedRR) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index f6d23f7..9e861bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -200,6 +201,9 @@ public class OpportunisticContainerAllocatorAMService
// Create RMContainers and update the NMTokens.
if (!oppContainers.isEmpty()) {
+ OpportunisticSchedulerMetrics schedulerMetrics =
+ OpportunisticSchedulerMetrics.getMetrics();
+ schedulerMetrics.incrAllocatedOppContainers(oppContainers.size());
handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers);
ApplicationMasterServiceUtils.addToAllocatedContainers(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 5fd064b..da53530 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
@@ -695,6 +696,7 @@ public abstract class AbstractYarnScheduler
if (node != null) {
node.releaseContainer(rmContainer.getContainerId(), false);
}
+ OpportunisticSchedulerMetrics.getMetrics().incrReleasedOppContainers(1);
}
// If the container is getting killed in ACQUIRED state, the requester (AM
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 221181d..9a8c02b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSche
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -753,6 +754,106 @@ public class TestOpportunisticContainerAllocatorAMService {
}
@Test(timeout = 60000)
+ public void testOpportunisticSchedulerMetrics() throws Exception {
+ HashMap<NodeId, MockNM> nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ nm1.registerNode();
+ nm2.registerNode();
+ OpportunisticSchedulerMetrics metrics =
+ OpportunisticSchedulerMetrics.getMetrics();
+
+ int allocContainers = metrics.getAllocatedContainers();
+ long aggrAllocatedContainers = metrics.getAggregatedAllocatedContainers();
+ long aggrOffSwitchContainers = metrics.getAggregatedOffSwitchContainers();
+ long aggrReleasedContainers = metrics.getAggregatedReleasedContainers();
+
+ OpportunisticContainerAllocatorAMService amservice =
+ (OpportunisticContainerAllocatorAMService) rm
+ .getApplicationMasterService();
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ ApplicationAttemptId attemptId =
+ app1.getCurrentAppAttempt().getAppAttemptId();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+
+ ((RMNodeImpl) rmNode1)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+ ((RMNodeImpl) rmNode2)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+ OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+ // Send add and update node events to AM Service.
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // All nodes 1 to 2 will be applicable for scheduling.
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+
+ AllocateResponse allocateResponse = am1.allocate(Arrays.asList(
+ ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest
+ .newInstance(ExecutionType.OPPORTUNISTIC, true))), null);
+
+ List<Container> allocatedContainers = allocateResponse
+ .getAllocatedContainers();
+ Assert.assertEquals(2, allocatedContainers.size());
+
+ Assert.assertEquals(allocContainers + 2, metrics.getAllocatedContainers());
+ Assert.assertEquals(aggrAllocatedContainers + 2,
+ metrics.getAggregatedAllocatedContainers());
+ Assert.assertEquals(aggrOffSwitchContainers + 2,
+ metrics.getAggregatedOffSwitchContainers());
+
+ Container container = allocatedContainers.get(0);
+ MockNM allocNode = nodes.get(container.getNodeId());
+
+ // Start Container in NM
+ allocNode.nodeHeartbeat(Arrays.asList(
+ ContainerStatus.newInstance(container.getId(),
+ ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
+ true);
+ rm.drainEvents();
+
+ // Verify that container is actually running wrt the RM..
+ RMContainer rmContainer = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(
+ container.getId().getApplicationAttemptId()).getRMContainer(
+ container.getId());
+ Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+ // Container Completed in the NM
+ allocNode.nodeHeartbeat(Arrays.asList(
+ ContainerStatus.newInstance(container.getId(),
+ ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)),
+ true);
+ rm.drainEvents();
+
+ // Verify that container has been removed..
+ rmContainer = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(
+ container.getId().getApplicationAttemptId()).getRMContainer(
+ container.getId());
+ Assert.assertNull(rmContainer);
+
+ Assert.assertEquals(allocContainers + 1, metrics.getAllocatedContainers());
+ Assert.assertEquals(aggrReleasedContainers + 1,
+ metrics.getAggregatedReleasedContainers());
+ }
+
+ @Test(timeout = 60000)
public void testAMCrashDuringAllocate() throws Exception {
MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
nm.registerNode();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org