You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by qu...@apache.org on 2022/03/29 07:58:50 UTC

[hadoop] branch trunk updated: YARN-10549. Decouple RM runner logic from SLSRunner. Contributed by Szilard Nemeth.

This is an automated email from the ASF dual-hosted git repository.

quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e386d6a  YARN-10549. Decouple RM runner logic from SLSRunner. Contributed by Szilard Nemeth.
e386d6a is described below

commit e386d6a6617450b827745ee6a0a856166d37d450
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Tue Mar 29 09:52:39 2022 +0200

    YARN-10549. Decouple RM runner logic from SLSRunner. Contributed by Szilard Nemeth.
---
 .../java/org/apache/hadoop/yarn/sls/AMRunner.java  |   1 +
 .../java/org/apache/hadoop/yarn/sls/RMRunner.java  | 137 +++++++++++++++++++++
 .../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 123 ++++--------------
 3 files changed, 164 insertions(+), 97 deletions(-)

diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
index da95c68..301b426 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
@@ -151,6 +151,7 @@ public class AMRunner {
     // if we use the nodeFile this could have been not initialized yet.
     if (stjp == null) {
       stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0]));
+      slsRunner.setStjp(stjp);
     }
 
     SynthJob job;
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java
new file mode 100644
index 0000000..dbded4b
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
+import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
+import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RMRunner {
+  private ResourceManager rm;
+  private String metricsOutputDir;
+  private Configuration conf;
+  private SLSRunner slsRunner;
+  private String tableMapping;
+  private Map<String, Integer> queueAppNumMap;
+
+  public RMRunner(Configuration conf, SLSRunner slsRunner) {
+    this.conf = conf;
+    this.slsRunner = slsRunner;
+    this.queueAppNumMap = new HashMap<>();
+  }
+
+  public void startRM() throws ClassNotFoundException, YarnException {
+    Configuration rmConf = new YarnConfiguration(conf);
+    String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
+
+    if (Class.forName(schedulerClass) == CapacityScheduler.class) {
+      rmConf.set(YarnConfiguration.RM_SCHEDULER,
+          SLSCapacityScheduler.class.getName());
+      rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+      rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+          ProportionalCapacityPreemptionPolicy.class.getName());
+    } else if (Class.forName(schedulerClass) == FairScheduler.class) {
+      rmConf.set(YarnConfiguration.RM_SCHEDULER,
+          SLSFairScheduler.class.getName());
+    } else if (Class.forName(schedulerClass) == FifoScheduler.class) {
+      // TODO add support for FifoScheduler
+      throw new YarnException("Fifo Scheduler is not supported yet.");
+    }
+    rmConf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        TableMapping.class, DNSToSwitchMapping.class);
+    rmConf.set(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY,
+        tableMapping);
+    rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
+
+    rm = new ResourceManager() {
+      @Override
+      protected ApplicationMasterLauncher createAMLauncher() {
+        return new MockAMLauncher(slsRunner, this.rmContext);
+      }
+    };
+
+    // Across runs of parametrized tests, the JvmMetrics objects is retained,
+    // but is not registered correctly
+    JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
+    jvmMetrics.registerIfNeeded();
+
+    // Init and start the actual ResourceManager
+    rm.init(rmConf);
+    rm.start();
+  }
+
+  public void increaseQueueAppNum(String queue) throws YarnException {
+    SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
+    String queueName = wrapper.getRealQueueName(queue);
+    Integer appNum = queueAppNumMap.get(queueName);
+    if (appNum == null) {
+      appNum = 1;
+    } else {
+      appNum = appNum + 1;
+    }
+
+    queueAppNumMap.put(queueName, appNum);
+    SchedulerMetrics metrics = wrapper.getSchedulerMetrics();
+    if (metrics != null) {
+      metrics.trackQueue(queueName);
+    }
+  }
+
+  public void setMetricsOutputDir(String metricsOutputDir) {
+    this.metricsOutputDir = metricsOutputDir;
+  }
+
+  public String getTableMapping() {
+    return tableMapping;
+  }
+
+  public void setTableMapping(String tableMapping) {
+    this.tableMapping = tableMapping;
+  }
+
+  public void stop() {
+    rm.stop();
+  }
+
+  public ResourceManager getRm() {
+    return rm;
+  }
+
+  public Map<String, Integer> getQueueAppNumMap() {
+    return queueAppNumMap;
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 2110e3c..e9ae7f5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -53,11 +53,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.metrics2.source.JvmMetrics;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.TableMapping;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -66,24 +62,14 @@ import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
-import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
-import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
-import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
-import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
 import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
 import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import org.apache.hadoop.yarn.sls.scheduler.Tracker;
 import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
 import org.apache.hadoop.yarn.sls.utils.SLSUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -109,11 +95,8 @@ import java.util.concurrent.TimeUnit;
 @Private
 @Unstable
 public class SLSRunner extends Configured implements Tool {
-  // RM, Runner
-  private ResourceManager rm;
   private static TaskRunner runner = new TaskRunner();
   private String[] inputTraces;
-  private Map<String, Integer> queueAppNumMap;
   private int poolSize;
 
   // NM simulator
@@ -122,12 +105,10 @@ public class SLSRunner extends Configured implements Tool {
   private String nodeFile;
 
   // metrics
-  private String metricsOutputDir;
   private boolean printSimulation;
 
   // other simulation information
   private int numNMs, numRacks;
-  private String tableMapping;
 
   private final static Map<String, Object> simulateInfoMap = new HashMap<>();
 
@@ -136,6 +117,7 @@ public class SLSRunner extends Configured implements Tool {
 
   private static boolean exitAtTheFinish = false;
   private AMRunner amRunner;
+  private RMRunner rmRunner;
 
   /**
    * The type of trace in input.
@@ -179,8 +161,8 @@ public class SLSRunner extends Configured implements Tool {
     setConf(tempConf);
 
     nmMap = new ConcurrentHashMap<>();
-    queueAppNumMap = new HashMap<>();
     amRunner = new AMRunner(runner, this);
+    rmRunner = new RMRunner(tempConf, this);
 
     // runner
     poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
@@ -225,12 +207,12 @@ public class SLSRunner extends Configured implements Tool {
    * @param inType
    * @param inTraces
    * @param nodes
-   * @param outDir
+   * @param metricsOutputDir
    * @param trackApps
    * @param printsimulation
    */
   public void setSimulationParams(TraceType inType, String[] inTraces,
-      String nodes, String outDir, Set<String> trackApps,
+      String nodes, String metricsOutputDir, Set<String> trackApps,
       boolean printsimulation) {
 
     this.inputType = inType;
@@ -240,8 +222,8 @@ public class SLSRunner extends Configured implements Tool {
     this.amRunner.setTrackedApps(trackApps);
     this.nodeFile = nodes;
     this.printSimulation = printsimulation;
-    metricsOutputDir = outDir;
-    tableMapping = outDir + "/tableMapping.csv";
+    this.rmRunner.setMetricsOutputDir(metricsOutputDir);
+    this.rmRunner.setTableMapping(metricsOutputDir + "/tableMapping.csv");
   }
 
   public void start() throws IOException, ClassNotFoundException, YarnException,
@@ -250,17 +232,19 @@ public class SLSRunner extends Configured implements Tool {
     enableDNSCaching(getConf());
 
     // start resource manager
-    startRM();
-    amRunner.setResourceManager(rm);
+    rmRunner.startRM();
+    amRunner.setResourceManager(rmRunner.getRm());
     // start node managers
     startNM();
     // start application masters
     amRunner.startAM();
+
     // set queue & tracked apps information
-    ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
-        .setQueueSet(this.queueAppNumMap.keySet());
-    ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
-        .setTrackedAppSet(amRunner.getTrackedApps());
+    SchedulerWrapper resourceScheduler =
+        (SchedulerWrapper) rmRunner.getRm().getResourceScheduler();
+    Tracker tracker = resourceScheduler.getTracker();
+    tracker.setQueueSet(rmRunner.getQueueAppNumMap().keySet());
+    tracker.setTrackedAppSet(amRunner.getTrackedApps());
     // print out simulation info
     printSimulationInfo();
     // blocked until all nodes RUNNING
@@ -286,49 +270,6 @@ public class SLSRunner extends Configured implements Tool {
     }
   }
 
-  private void startRM() throws ClassNotFoundException, YarnException {
-    Configuration rmConf = new YarnConfiguration(getConf());
-    String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
-
-    if (Class.forName(schedulerClass) == CapacityScheduler.class) {
-      rmConf.set(YarnConfiguration.RM_SCHEDULER,
-          SLSCapacityScheduler.class.getName());
-      rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
-      rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
-          ProportionalCapacityPreemptionPolicy.class.getName());
-    } else if (Class.forName(schedulerClass) == FairScheduler.class) {
-      rmConf.set(YarnConfiguration.RM_SCHEDULER,
-          SLSFairScheduler.class.getName());
-    } else if (Class.forName(schedulerClass) == FifoScheduler.class) {
-      // TODO add support for FifoScheduler
-      throw new YarnException("Fifo Scheduler is not supported yet.");
-    }
-    rmConf.setClass(
-        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
-        TableMapping.class, DNSToSwitchMapping.class);
-    rmConf.set(
-        CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY,
-        tableMapping);
-    rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
-
-    final SLSRunner se = this;
-    rm = new ResourceManager() {
-      @Override
-      protected ApplicationMasterLauncher createAMLauncher() {
-        return new MockAMLauncher(se, this.rmContext);
-      }
-    };
-
-    // Across runs of parametrized tests, the JvmMetrics objects is retained,
-    // but is not registered correctly
-    JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
-    jvmMetrics.registerIfNeeded();
-
-    // Init and start the actual ResourceManager
-    rm.init(rmConf);
-    rm.start();
-  }
-
   private void startNM() throws YarnException, IOException,
       InterruptedException {
     // nm configuration
@@ -368,7 +309,7 @@ public class SLSRunner extends Configured implements Tool {
       throw new YarnException("No node! Please configure nodes.");
     }
 
-    SLSUtils.generateNodeTableMapping(nodeSet, tableMapping);
+    SLSUtils.generateNodeTableMapping(nodeSet, rmRunner.getTableMapping());
 
     // create NM simulators
     Random random = new Random();
@@ -391,7 +332,7 @@ public class SLSRunner extends Configured implements Tool {
             Set<NodeLabel> nodeLabels = nodeDetails.getLabels();
             nm.init(hostName, nmResource,
                 random.nextInt(heartbeatInterval),
-                heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels);
+                heartbeatInterval, rmRunner.getRm(), resourceUtilizationRatio, nodeLabels);
             nmMap.put(nm.getNode().getNodeID(), nm);
             runner.schedule(nm);
             rackSet.add(nm.getNode().getRackName());
@@ -411,7 +352,7 @@ public class SLSRunner extends Configured implements Tool {
     long startTimeMS = System.currentTimeMillis();
     while (true) {
       int numRunningNodes = 0;
-      for (RMNode node : rm.getRMContext().getRMNodes().values()) {
+      for (RMNode node : rmRunner.getRm().getRMContext().getRMNodes().values()) {
         if (node.getState() == NodeState.RUNNING) {
           numRunningNodes++;
         }
@@ -435,21 +376,8 @@ public class SLSRunner extends Configured implements Tool {
     return Resources.createResource(containerMemory, containerVCores);
   }
 
-  void increaseQueueAppNum(String queue) throws YarnException {
-    SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
-    String queueName = wrapper.getRealQueueName(queue);
-    Integer appNum = queueAppNumMap.get(queueName);
-    if (appNum == null) {
-      appNum = 1;
-    } else {
-      appNum = appNum + 1;
-    }
-
-    queueAppNumMap.put(queueName, appNum);
-    SchedulerMetrics metrics = wrapper.getSchedulerMetrics();
-    if (metrics != null) {
-      metrics.trackQueue(queueName);
-    }
+  public void increaseQueueAppNum(String queue) throws YarnException {
+    rmRunner.increaseQueueAppNum(queue);
   }
 
   private void printSimulationInfo() {
@@ -457,6 +385,7 @@ public class SLSRunner extends Configured implements Tool {
     final int numTasks = amRunner.getNumTasks();
     final long maxRuntime = amRunner.getMaxRuntime();
     Map<String, AMSimulator> amMap = amRunner.getAmMap();
+    Map<String, Integer> queueAppNumMap = rmRunner.getQueueAppNumMap();
 
     if (printSimulation) {
       // node
@@ -523,7 +452,7 @@ public class SLSRunner extends Configured implements Tool {
   }
 
   public void stop() throws InterruptedException {
-    rm.stop();
+    rmRunner.stop();
     runner.stop();
   }
 
@@ -696,14 +625,14 @@ public class SLSRunner extends Configured implements Tool {
     }
   }
 
-  public ResourceManager getRm() {
-    return rm;
-  }
-
   public SynthTraceJobProducer getStjp() {
     return stjp;
   }
 
+  public void setStjp(SynthTraceJobProducer stjp) {
+    this.stjp = stjp;
+  }
+
   public AMSimulator getAMSimulatorByAppId(ApplicationId appId) {
     return amRunner.getAMSimulator(appId);
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org