You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by qu...@apache.org on 2022/03/29 07:58:50 UTC
[hadoop] branch trunk updated: YARN-10549. Decouple RM runner logic from SLSRunner. Contributed by Szilard Nemeth.
This is an automated email from the ASF dual-hosted git repository.
quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new e386d6a YARN-10549. Decouple RM runner logic from SLSRunner. Contributed by Szilard Nemeth.
e386d6a is described below
commit e386d6a6617450b827745ee6a0a856166d37d450
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Tue Mar 29 09:52:39 2022 +0200
YARN-10549. Decouple RM runner logic from SLSRunner. Contributed by Szilard Nemeth.
---
.../java/org/apache/hadoop/yarn/sls/AMRunner.java | 1 +
.../java/org/apache/hadoop/yarn/sls/RMRunner.java | 137 +++++++++++++++++++++
.../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 123 ++++--------------
3 files changed, 164 insertions(+), 97 deletions(-)
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
index da95c68..301b426 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
@@ -151,6 +151,7 @@ public class AMRunner {
// if we use the nodeFile this could have been not initialized yet.
if (stjp == null) {
stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0]));
+ slsRunner.setStjp(stjp);
}
SynthJob job;
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java
new file mode 100644
index 0000000..dbded4b
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
+import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
+import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RMRunner {
+ private ResourceManager rm;
+ private String metricsOutputDir;
+ private Configuration conf;
+ private SLSRunner slsRunner;
+ private String tableMapping;
+ private Map<String, Integer> queueAppNumMap;
+
+ public RMRunner(Configuration conf, SLSRunner slsRunner) {
+ this.conf = conf;
+ this.slsRunner = slsRunner;
+ this.queueAppNumMap = new HashMap<>();
+ }
+
+ public void startRM() throws ClassNotFoundException, YarnException {
+ Configuration rmConf = new YarnConfiguration(conf);
+ String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
+
+ if (Class.forName(schedulerClass) == CapacityScheduler.class) {
+ rmConf.set(YarnConfiguration.RM_SCHEDULER,
+ SLSCapacityScheduler.class.getName());
+ rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ ProportionalCapacityPreemptionPolicy.class.getName());
+ } else if (Class.forName(schedulerClass) == FairScheduler.class) {
+ rmConf.set(YarnConfiguration.RM_SCHEDULER,
+ SLSFairScheduler.class.getName());
+ } else if (Class.forName(schedulerClass) == FifoScheduler.class) {
+ // TODO add support for FifoScheduler
+ throw new YarnException("Fifo Scheduler is not supported yet.");
+ }
+ rmConf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ TableMapping.class, DNSToSwitchMapping.class);
+ rmConf.set(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY,
+ tableMapping);
+ rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
+
+ rm = new ResourceManager() {
+ @Override
+ protected ApplicationMasterLauncher createAMLauncher() {
+ return new MockAMLauncher(slsRunner, this.rmContext);
+ }
+ };
+
+ // Across runs of parametrized tests, the JvmMetrics objects is retained,
+ // but is not registered correctly
+ JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
+ jvmMetrics.registerIfNeeded();
+
+ // Init and start the actual ResourceManager
+ rm.init(rmConf);
+ rm.start();
+ }
+
+ public void increaseQueueAppNum(String queue) throws YarnException {
+ SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
+ String queueName = wrapper.getRealQueueName(queue);
+ Integer appNum = queueAppNumMap.get(queueName);
+ if (appNum == null) {
+ appNum = 1;
+ } else {
+ appNum = appNum + 1;
+ }
+
+ queueAppNumMap.put(queueName, appNum);
+ SchedulerMetrics metrics = wrapper.getSchedulerMetrics();
+ if (metrics != null) {
+ metrics.trackQueue(queueName);
+ }
+ }
+
+ public void setMetricsOutputDir(String metricsOutputDir) {
+ this.metricsOutputDir = metricsOutputDir;
+ }
+
+ public String getTableMapping() {
+ return tableMapping;
+ }
+
+ public void setTableMapping(String tableMapping) {
+ this.tableMapping = tableMapping;
+ }
+
+ public void stop() {
+ rm.stop();
+ }
+
+ public ResourceManager getRm() {
+ return rm;
+ }
+
+ public Map<String, Integer> getQueueAppNumMap() {
+ return queueAppNumMap;
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 2110e3c..e9ae7f5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -53,11 +53,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.metrics2.source.JvmMetrics;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.TableMapping;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -66,24 +62,14 @@ import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
-import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
-import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
-import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
-import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import org.apache.hadoop.yarn.sls.scheduler.Tracker;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -109,11 +95,8 @@ import java.util.concurrent.TimeUnit;
@Private
@Unstable
public class SLSRunner extends Configured implements Tool {
- // RM, Runner
- private ResourceManager rm;
private static TaskRunner runner = new TaskRunner();
private String[] inputTraces;
- private Map<String, Integer> queueAppNumMap;
private int poolSize;
// NM simulator
@@ -122,12 +105,10 @@ public class SLSRunner extends Configured implements Tool {
private String nodeFile;
// metrics
- private String metricsOutputDir;
private boolean printSimulation;
// other simulation information
private int numNMs, numRacks;
- private String tableMapping;
private final static Map<String, Object> simulateInfoMap = new HashMap<>();
@@ -136,6 +117,7 @@ public class SLSRunner extends Configured implements Tool {
private static boolean exitAtTheFinish = false;
private AMRunner amRunner;
+ private RMRunner rmRunner;
/**
* The type of trace in input.
@@ -179,8 +161,8 @@ public class SLSRunner extends Configured implements Tool {
setConf(tempConf);
nmMap = new ConcurrentHashMap<>();
- queueAppNumMap = new HashMap<>();
amRunner = new AMRunner(runner, this);
+ rmRunner = new RMRunner(tempConf, this);
// runner
poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
@@ -225,12 +207,12 @@ public class SLSRunner extends Configured implements Tool {
* @param inType
* @param inTraces
* @param nodes
- * @param outDir
+ * @param metricsOutputDir
* @param trackApps
* @param printsimulation
*/
public void setSimulationParams(TraceType inType, String[] inTraces,
- String nodes, String outDir, Set<String> trackApps,
+ String nodes, String metricsOutputDir, Set<String> trackApps,
boolean printsimulation) {
this.inputType = inType;
@@ -240,8 +222,8 @@ public class SLSRunner extends Configured implements Tool {
this.amRunner.setTrackedApps(trackApps);
this.nodeFile = nodes;
this.printSimulation = printsimulation;
- metricsOutputDir = outDir;
- tableMapping = outDir + "/tableMapping.csv";
+ this.rmRunner.setMetricsOutputDir(metricsOutputDir);
+ this.rmRunner.setTableMapping(metricsOutputDir + "/tableMapping.csv");
}
public void start() throws IOException, ClassNotFoundException, YarnException,
@@ -250,17 +232,19 @@ public class SLSRunner extends Configured implements Tool {
enableDNSCaching(getConf());
// start resource manager
- startRM();
- amRunner.setResourceManager(rm);
+ rmRunner.startRM();
+ amRunner.setResourceManager(rmRunner.getRm());
// start node managers
startNM();
// start application masters
amRunner.startAM();
+
// set queue & tracked apps information
- ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
- .setQueueSet(this.queueAppNumMap.keySet());
- ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
- .setTrackedAppSet(amRunner.getTrackedApps());
+ SchedulerWrapper resourceScheduler =
+ (SchedulerWrapper) rmRunner.getRm().getResourceScheduler();
+ Tracker tracker = resourceScheduler.getTracker();
+ tracker.setQueueSet(rmRunner.getQueueAppNumMap().keySet());
+ tracker.setTrackedAppSet(amRunner.getTrackedApps());
// print out simulation info
printSimulationInfo();
// blocked until all nodes RUNNING
@@ -286,49 +270,6 @@ public class SLSRunner extends Configured implements Tool {
}
}
- private void startRM() throws ClassNotFoundException, YarnException {
- Configuration rmConf = new YarnConfiguration(getConf());
- String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
-
- if (Class.forName(schedulerClass) == CapacityScheduler.class) {
- rmConf.set(YarnConfiguration.RM_SCHEDULER,
- SLSCapacityScheduler.class.getName());
- rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
- rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
- ProportionalCapacityPreemptionPolicy.class.getName());
- } else if (Class.forName(schedulerClass) == FairScheduler.class) {
- rmConf.set(YarnConfiguration.RM_SCHEDULER,
- SLSFairScheduler.class.getName());
- } else if (Class.forName(schedulerClass) == FifoScheduler.class) {
- // TODO add support for FifoScheduler
- throw new YarnException("Fifo Scheduler is not supported yet.");
- }
- rmConf.setClass(
- CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
- TableMapping.class, DNSToSwitchMapping.class);
- rmConf.set(
- CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY,
- tableMapping);
- rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
-
- final SLSRunner se = this;
- rm = new ResourceManager() {
- @Override
- protected ApplicationMasterLauncher createAMLauncher() {
- return new MockAMLauncher(se, this.rmContext);
- }
- };
-
- // Across runs of parametrized tests, the JvmMetrics objects is retained,
- // but is not registered correctly
- JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
- jvmMetrics.registerIfNeeded();
-
- // Init and start the actual ResourceManager
- rm.init(rmConf);
- rm.start();
- }
-
private void startNM() throws YarnException, IOException,
InterruptedException {
// nm configuration
@@ -368,7 +309,7 @@ public class SLSRunner extends Configured implements Tool {
throw new YarnException("No node! Please configure nodes.");
}
- SLSUtils.generateNodeTableMapping(nodeSet, tableMapping);
+ SLSUtils.generateNodeTableMapping(nodeSet, rmRunner.getTableMapping());
// create NM simulators
Random random = new Random();
@@ -391,7 +332,7 @@ public class SLSRunner extends Configured implements Tool {
Set<NodeLabel> nodeLabels = nodeDetails.getLabels();
nm.init(hostName, nmResource,
random.nextInt(heartbeatInterval),
- heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels);
+ heartbeatInterval, rmRunner.getRm(), resourceUtilizationRatio, nodeLabels);
nmMap.put(nm.getNode().getNodeID(), nm);
runner.schedule(nm);
rackSet.add(nm.getNode().getRackName());
@@ -411,7 +352,7 @@ public class SLSRunner extends Configured implements Tool {
long startTimeMS = System.currentTimeMillis();
while (true) {
int numRunningNodes = 0;
- for (RMNode node : rm.getRMContext().getRMNodes().values()) {
+ for (RMNode node : rmRunner.getRm().getRMContext().getRMNodes().values()) {
if (node.getState() == NodeState.RUNNING) {
numRunningNodes++;
}
@@ -435,21 +376,8 @@ public class SLSRunner extends Configured implements Tool {
return Resources.createResource(containerMemory, containerVCores);
}
- void increaseQueueAppNum(String queue) throws YarnException {
- SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
- String queueName = wrapper.getRealQueueName(queue);
- Integer appNum = queueAppNumMap.get(queueName);
- if (appNum == null) {
- appNum = 1;
- } else {
- appNum = appNum + 1;
- }
-
- queueAppNumMap.put(queueName, appNum);
- SchedulerMetrics metrics = wrapper.getSchedulerMetrics();
- if (metrics != null) {
- metrics.trackQueue(queueName);
- }
+ public void increaseQueueAppNum(String queue) throws YarnException {
+ rmRunner.increaseQueueAppNum(queue);
}
private void printSimulationInfo() {
@@ -457,6 +385,7 @@ public class SLSRunner extends Configured implements Tool {
final int numTasks = amRunner.getNumTasks();
final long maxRuntime = amRunner.getMaxRuntime();
Map<String, AMSimulator> amMap = amRunner.getAmMap();
+ Map<String, Integer> queueAppNumMap = rmRunner.getQueueAppNumMap();
if (printSimulation) {
// node
@@ -523,7 +452,7 @@ public class SLSRunner extends Configured implements Tool {
}
public void stop() throws InterruptedException {
- rm.stop();
+ rmRunner.stop();
runner.stop();
}
@@ -696,14 +625,14 @@ public class SLSRunner extends Configured implements Tool {
}
}
- public ResourceManager getRm() {
- return rm;
- }
-
public SynthTraceJobProducer getStjp() {
return stjp;
}
+ public void setStjp(SynthTraceJobProducer stjp) {
+ this.stjp = stjp;
+ }
+
public AMSimulator getAMSimulatorByAppId(ApplicationId appId) {
return amRunner.getAMSimulator(appId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org