You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by qu...@apache.org on 2022/03/25 17:49:26 UTC

[hadoop] branch trunk updated: YARN-10548. Decouple AM runner logic from SLSRunner. Contributed by Szilard Nemeth.

This is an automated email from the ASF dual-hosted git repository.

quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 08a77a7  YARN-10548. Decouple AM runner logic from SLSRunner. Contributed by Szilard Nemeth.
08a77a7 is described below

commit 08a77a765ba635da1cc44f36b103116605a517ee
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Fri Mar 25 18:48:56 2022 +0100

    YARN-10548. Decouple AM runner logic from SLSRunner. Contributed by Szilard Nemeth.
---
 .../java/org/apache/hadoop/yarn/sls/AMRunner.java  | 297 +++++++++++++++++++++
 .../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 250 ++++-------------
 .../yarn/sls/resourcemanager/MockAMLauncher.java   |  17 +-
 3 files changed, 354 insertions(+), 210 deletions(-)

diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
new file mode 100644
index 0000000..da95c68
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.rumen.JobTraceReader;
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.SLSRunner.TraceType;
+import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class AMRunner {
+  private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class);
+  static int REMAINING_APPS = 0;
+
+  private final Configuration conf;
+  private int AM_ID;
+  private Map<String, AMSimulator> amMap;
+  private Map<ApplicationId, AMSimulator> appIdAMSim;
+  private Set<String> trackedApps;
+  private Map<String, Class> amClassMap;
+  private TraceType inputType;
+  private String[] inputTraces;
+  private SynthTraceJobProducer stjp;
+  private TaskRunner runner;
+  private SLSRunner slsRunner;
+  private int numAMs, numTasks;
+  private long maxRuntime;
+  private ResourceManager rm;
+
+  public AMRunner(TaskRunner runner, SLSRunner slsRunner) {
+    this.runner = runner;
+    this.slsRunner = slsRunner;
+    this.conf = slsRunner.getConf();
+  }
+
+
+  public void init(Configuration conf) throws ClassNotFoundException {
+    amMap = new ConcurrentHashMap<>();
+    amClassMap = new HashMap<>();
+    appIdAMSim = new ConcurrentHashMap<>();
+    // <AMType, Class> map
+    for (Map.Entry e : conf) {
+      String key = e.getKey().toString();
+      if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
+        String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
+        amClassMap.put(amType, Class.forName(conf.get(key)));
+      }
+    }
+  }
+
+  public void startAM() throws YarnException, IOException {
+    switch (inputType) {
+      case SLS:
+        for (String inputTrace : inputTraces) {
+          startAMFromSLSTrace(inputTrace);
+        }
+        break;
+      case RUMEN:
+        long baselineTimeMS = 0;
+        for (String inputTrace : inputTraces) {
+          startAMFromRumenTrace(inputTrace, baselineTimeMS);
+        }
+        break;
+      case SYNTH:
+        startAMFromSynthGenerator();
+        break;
+      default:
+        throw new YarnException("Input configuration not recognized, "
+            + "trace type should be SLS, RUMEN, or SYNTH");
+    }
+
+    numAMs = amMap.size();
+    REMAINING_APPS = numAMs;
+  }
+
+  /**
+   * Parse workload from a SLS trace file.
+   */
+  private void startAMFromSLSTrace(String inputTrace) throws IOException {
+    JsonFactory jsonF = new JsonFactory();
+    ObjectMapper mapper = new ObjectMapper();
+
+    try (Reader input = new InputStreamReader(
+        new FileInputStream(inputTrace), StandardCharsets.UTF_8)) {
+      JavaType type = mapper.getTypeFactory().
+          constructMapType(Map.class, String.class, String.class);
+      Iterator<Map<String, String>> jobIter = mapper.readValues(
+          jsonF.createParser(input), type);
+
+      while (jobIter.hasNext()) {
+        try {
+          Map<String, String> jsonJob = jobIter.next();
+          AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, slsRunner);
+          startAMs(amDef);
+        } catch (Exception e) {
+          LOG.error("Failed to create an AM: {}", e.getMessage());
+        }
+      }
+    }
+  }
+
+  /**
+   * parse workload information from synth-generator trace files.
+   */
+  private void startAMFromSynthGenerator() throws YarnException, IOException {
+    Configuration localConf = new Configuration();
+    localConf.set("fs.defaultFS", "file:///");
+    // if we use the nodeFile this could have been not initialized yet.
+    if (stjp == null) {
+      stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0]));
+    }
+
+    SynthJob job;
+    // we use stjp, a reference to the job producer instantiated during node
+    // creation
+    while ((job = (SynthJob) stjp.getNextJob()) != null) {
+      ReservationId reservationId = null;
+      if (job.hasDeadline()) {
+        reservationId = ReservationId
+            .newInstance(rm.getStartTime(), AM_ID);
+      }
+      AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, slsRunner);
+      startAMs(amDef, reservationId, job.getParams(), job.getDeadline());
+    }
+  }
+
+  /**
+   * Parse workload from a rumen trace file.
+   */
+  private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
+      throws IOException {
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", "file:///");
+    File fin = new File(inputTrace);
+
+    try (JobTraceReader reader = new JobTraceReader(
+        new Path(fin.getAbsolutePath()), conf)) {
+      LoggedJob job = reader.getNext();
+
+      while (job != null) {
+        try {
+          AMDefinitionRumen amDef =
+              AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS,
+                  slsRunner);
+          startAMs(amDef);
+        } catch (Exception e) {
+          LOG.error("Failed to create an AM", e);
+        }
+        job = reader.getNext();
+      }
+    }
+  }
+
+  private void startAMs(AMDefinition amDef) {
+    for (int i = 0; i < amDef.getJobCount(); i++) {
+      JobDefinition jobDef = JobDefinition.Builder.create()
+          .withAmDefinition(amDef)
+          .withDeadline(-1)
+          .withReservationId(null)
+          .withParams(null)
+          .build();
+      runNewAM(jobDef);
+    }
+  }
+
+  private void startAMs(AMDefinition amDef,
+      ReservationId reservationId,
+      Map<String, String> params, long deadline) {
+    for (int i = 0; i < amDef.getJobCount(); i++) {
+      JobDefinition jobDef = JobDefinition.Builder.create()
+          .withAmDefinition(amDef)
+          .withReservationId(reservationId)
+          .withParams(params)
+          .withDeadline(deadline)
+          .build();
+      runNewAM(jobDef);
+    }
+  }
+
+  private void runNewAM(JobDefinition jobDef) {
+    AMDefinition amDef = jobDef.getAmDefinition();
+    String oldJobId = amDef.getOldAppId();
+    AMSimulator amSim =
+        createAmSimulator(amDef.getAmType());
+
+    if (amSim != null) {
+      int heartbeatInterval = conf.getInt(
+          SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
+          SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+      boolean isTracked = trackedApps.contains(oldJobId);
+
+      if (oldJobId == null) {
+        oldJobId = Integer.toString(AM_ID);
+      }
+      AM_ID++;
+      amSim.init(amDef, rm, slsRunner, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim);
+      if (jobDef.getReservationId() != null) {
+        // if we have a ReservationId, delegate reservation creation to
+        // AMSim (reservation shape is impl specific)
+        UTCClock clock = new UTCClock();
+        amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(), clock.getTime());
+      }
+      runner.schedule(amSim);
+      maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime());
+      numTasks += amDef.getTaskContainers().size();
+      amMap.put(oldJobId, amSim);
+    }
+  }
+
+  private AMSimulator createAmSimulator(String jobType) {
+    return (AMSimulator) ReflectionUtils.newInstance(
+        amClassMap.get(jobType), new Configuration());
+  }
+
+  public AMSimulator getAMSimulator(ApplicationId appId) {
+    return appIdAMSim.get(appId);
+  }
+
+  public void setInputType(TraceType inputType) {
+    this.inputType = inputType;
+  }
+
+  public void setInputTraces(String[] inputTraces) {
+    this.inputTraces = inputTraces;
+  }
+
+  public void setResourceManager(ResourceManager rm) {
+    this.rm = rm;
+  }
+
+  public Set<String> getTrackedApps() {
+    return trackedApps;
+  }
+
+  public void setTrackedApps(Set<String> trackApps) {
+    this.trackedApps = trackApps;
+  }
+
+  public int getNumAMs() {
+    return numAMs;
+  }
+
+  public int getNumTasks() {
+    return numTasks;
+  }
+
+  public long getMaxRuntime() {
+    return maxRuntime;
+  }
+
+  public Map<String, AMSimulator> getAmMap() {
+    return amMap;
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 260a600..48ad610 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -58,16 +58,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.TableMapping;
-import org.apache.hadoop.tools.rumen.JobTraceReader;
-import org.apache.hadoop.tools.rumen.LoggedJob;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -84,19 +80,32 @@ import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
 import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
 import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
-import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
-import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
 import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
 import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
-import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
 import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
 import org.apache.hadoop.yarn.sls.utils.SLSUtils;
-import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.security.Security;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 @Private
 @Unstable
 public class SLSRunner extends Configured implements Tool {
@@ -112,21 +121,12 @@ public class SLSRunner extends Configured implements Tool {
   private Resource nodeManagerResource;
   private String nodeFile;
 
-  // AM simulator
-  private int AM_ID;
-  private Map<String, AMSimulator> amMap;
-  private Map<ApplicationId, AMSimulator> appIdAMSim;
-  private Set<String> trackedApps;
-  private Map<String, Class> amClassMap;
-  private static int remainingApps = 0;
-
   // metrics
   private String metricsOutputDir;
   private boolean printSimulation;
 
   // other simulation information
-  private int numNMs, numRacks, numAMs, numTasks;
-  private long maxRuntime;
+  private int numNMs, numRacks;
   private String tableMapping;
 
   private final static Map<String, Object> simulateInfoMap = new HashMap<>();
@@ -135,6 +135,7 @@ public class SLSRunner extends Configured implements Tool {
   public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
 
   private static boolean exitAtTheFinish = false;
+  private AMRunner amRunner;
 
   /**
    * The type of trace in input.
@@ -151,7 +152,7 @@ public class SLSRunner extends Configured implements Tool {
   private SynthTraceJobProducer stjp;
 
   public static int getRemainingApps() {
-    return remainingApps;
+    return AMRunner.REMAINING_APPS;
   }
 
   public SLSRunner() throws ClassNotFoundException {
@@ -176,9 +177,7 @@ public class SLSRunner extends Configured implements Tool {
   private void init(Configuration tempConf) throws ClassNotFoundException {
     nmMap = new ConcurrentHashMap<>();
     queueAppNumMap = new HashMap<>();
-    amMap = new ConcurrentHashMap<>();
-    amClassMap = new HashMap<>();
-    appIdAMSim = new ConcurrentHashMap<>();
+    amRunner = new AMRunner(runner, this);
     // runner configuration
     setConf(tempConf);
 
@@ -186,15 +185,8 @@ public class SLSRunner extends Configured implements Tool {
     poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
         SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
     SLSRunner.runner.setQueueSize(poolSize);
-    // <AMType, Class> map
-    for (Map.Entry e : tempConf) {
-      String key = e.getKey().toString();
-      if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
-        String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
-        amClassMap.put(amType, Class.forName(tempConf.get(key)));
-      }
-    }
 
+    amRunner.init(tempConf);
     nodeManagerResource = getNodeManagerResource();
   }
 
@@ -227,14 +219,25 @@ public class SLSRunner extends Configured implements Tool {
     return Collections.unmodifiableMap(simulateInfoMap);
   }
 
+  /**
+   * This is invoked before start.
+   * @param inType
+   * @param inTraces
+   * @param nodes
+   * @param outDir
+   * @param trackApps
+   * @param printsimulation
+   */
   public void setSimulationParams(TraceType inType, String[] inTraces,
       String nodes, String outDir, Set<String> trackApps,
       boolean printsimulation) {
 
     this.inputType = inType;
     this.inputTraces = inTraces.clone();
+    this.amRunner.setInputType(this.inputType);
+    this.amRunner.setInputTraces(this.inputTraces);
+    this.amRunner.setTrackedApps(trackApps);
     this.nodeFile = nodes;
-    this.trackedApps = trackApps;
     this.printSimulation = printsimulation;
     metricsOutputDir = outDir;
     tableMapping = outDir + "/tableMapping.csv";
@@ -247,15 +250,16 @@ public class SLSRunner extends Configured implements Tool {
 
     // start resource manager
     startRM();
+    amRunner.setResourceManager(rm);
     // start node managers
     startNM();
     // start application masters
-    startAM();
+    amRunner.startAM();
     // set queue & tracked apps information
     ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
         .setQueueSet(this.queueAppNumMap.keySet());
     ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
-        .setTrackedAppSet(this.trackedApps);
+        .setTrackedAppSet(amRunner.getTrackedApps());
     // print out simulation info
     printSimulationInfo();
     // blocked until all nodes RUNNING
@@ -310,7 +314,7 @@ public class SLSRunner extends Configured implements Tool {
     rm = new ResourceManager() {
       @Override
       protected ApplicationMasterLauncher createAMLauncher() {
-        return new MockAMLauncher(se, this.rmContext, appIdAMSim);
+        return new MockAMLauncher(se, this.rmContext);
       }
     };
 
@@ -422,109 +426,6 @@ public class SLSRunner extends Configured implements Tool {
         System.currentTimeMillis() - startTimeMS);
   }
 
-  private void startAM() throws YarnException, IOException {
-    switch (inputType) {
-    case SLS:
-      for (String inputTrace : inputTraces) {
-        startAMFromSLSTrace(inputTrace);
-      }
-      break;
-    case RUMEN:
-      long baselineTimeMS = 0;
-      for (String inputTrace : inputTraces) {
-        startAMFromRumenTrace(inputTrace, baselineTimeMS);
-      }
-      break;
-    case SYNTH:
-      startAMFromSynthGenerator();
-      break;
-    default:
-      throw new YarnException("Input configuration not recognized, "
-          + "trace type should be SLS, RUMEN, or SYNTH");
-    }
-
-    numAMs = amMap.size();
-    remainingApps = numAMs;
-  }
-
-  /**
-   * Parse workload from a SLS trace file.
-   */
-  private void startAMFromSLSTrace(String inputTrace) throws IOException {
-    JsonFactory jsonF = new JsonFactory();
-    ObjectMapper mapper = new ObjectMapper();
-
-    try (Reader input = new InputStreamReader(
-        new FileInputStream(inputTrace), StandardCharsets.UTF_8)) {
-      JavaType type = mapper.getTypeFactory().
-          constructMapType(Map.class, String.class, String.class);
-      Iterator<Map<String, String>> jobIter = mapper.readValues(
-          jsonF.createParser(input), type);
-
-      while (jobIter.hasNext()) {
-        try {
-          Map<String, String> jsonJob = jobIter.next();
-          AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, this);
-          startAMs(amDef);
-        } catch (Exception e) {
-          LOG.error("Failed to create an AM: {}", e.getMessage());
-        }
-      }
-    }
-  }
-
-  private void startAMs(AMDefinition amDef) {
-    for (int i = 0; i < amDef.getJobCount(); i++) {
-      JobDefinition jobDef = JobDefinition.Builder.create()
-          .withAmDefinition(amDef)
-          .withDeadline(-1)
-          .withReservationId(null)
-          .withParams(null)
-          .build();
-      runNewAM(jobDef);
-    }
-  }
-
-  private void startAMs(AMDefinition amDef, ReservationId reservationId,
-      Map<String, String> params, long deadline) {
-    for (int i = 0; i < amDef.getJobCount(); i++) {
-      JobDefinition jobDef = JobDefinition.Builder.create()
-          .withAmDefinition(amDef)
-          .withReservationId(reservationId)
-          .withParams(params)
-          .withDeadline(deadline)
-          .build();
-      runNewAM(jobDef);
-    }
-  }
-
-  /**
-   * Parse workload from a rumen trace file.
-   */
-  private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
-      throws IOException {
-    Configuration conf = new Configuration();
-    conf.set("fs.defaultFS", "file:///");
-    File fin = new File(inputTrace);
-
-    try (JobTraceReader reader = new JobTraceReader(
-        new Path(fin.getAbsolutePath()), conf)) {
-      LoggedJob job = reader.getNext();
-
-      while (job != null) {
-        try {
-          AMDefinitionRumen amDef =
-              AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS,
-                  this);
-          startAMs(amDef);
-        } catch (Exception e) {
-          LOG.error("Failed to create an AM", e);
-        }
-        job = reader.getNext();
-      }
-    }
-  }
-
   Resource getDefaultContainerResource() {
     int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
         SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
@@ -533,31 +434,6 @@ public class SLSRunner extends Configured implements Tool {
     return Resources.createResource(containerMemory, containerVCores);
   }
 
-  /**
-   * parse workload information from synth-generator trace files.
-   */
-  private void startAMFromSynthGenerator() throws YarnException, IOException {
-    Configuration localConf = new Configuration();
-    localConf.set("fs.defaultFS", "file:///");
-    // if we use the nodeFile this could have been not initialized yet.
-    if (stjp == null) {
-      stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
-    }
-
-    SynthJob job;
-    // we use stjp, a reference to the job producer instantiated during node
-    // creation
-    while ((job = (SynthJob) stjp.getNextJob()) != null) {
-      ReservationId reservationId = null;
-      if (job.hasDeadline()) {
-        reservationId = ReservationId
-            .newInstance(rm.getStartTime(), AM_ID);
-      }
-      AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, this);
-      startAMs(amDef, reservationId, job.getParams(), job.getDeadline());
-    }
-  }
-
   void increaseQueueAppNum(String queue) throws YarnException {
     SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
     String queueName = wrapper.getRealQueueName(queue);
@@ -575,43 +451,12 @@ public class SLSRunner extends Configured implements Tool {
     }
   }
 
-  private AMSimulator createAmSimulator(String jobType) {
-    return (AMSimulator) ReflectionUtils.newInstance(
-          amClassMap.get(jobType), new Configuration());
-  }
-
-  private void runNewAM(JobDefinition jobDef) {
-    AMDefinition amDef = jobDef.getAmDefinition();
-    String oldJobId = amDef.getOldAppId();
-    AMSimulator amSim =
-        createAmSimulator(amDef.getAmType());
-
-    if (amSim != null) {
-      int heartbeatInterval = getConf().getInt(
-          SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
-          SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
-      boolean isTracked = trackedApps.contains(oldJobId);
-
-      if (oldJobId == null) {
-        oldJobId = Integer.toString(AM_ID);
-      }
-      AM_ID++;
-      amSim.init(amDef, rm, this, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim);
-      if (jobDef.getReservationId() != null) {
-        // if we have a ReservationId, delegate reservation creation to
-        // AMSim (reservation shape is impl specific)
-        UTCClock clock = new UTCClock();
-        amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(),
-            clock.getTime());
-      }
-      runner.schedule(amSim);
-      maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime());
-      numTasks += amDef.getTaskContainers().size();
-      amMap.put(oldJobId, amSim);
-    }
-  }
-
   private void printSimulationInfo() {
+    final int numAMs = amRunner.getNumAMs();
+    final int numTasks = amRunner.getNumTasks();
+    final long maxRuntime = amRunner.getMaxRuntime();
+    Map<String, AMSimulator> amMap = amRunner.getAmMap();
+
     if (printSimulation) {
       // node
       LOG.info("------------------------------------");
@@ -663,7 +508,10 @@ public class SLSRunner extends Configured implements Tool {
   }
 
   public static void decreaseRemainingApps() {
-    remainingApps--;
+    AMRunner.REMAINING_APPS--;
+    if (AMRunner.REMAINING_APPS == 0) {
+      exitSLSRunner();
+    }
   }
 
   public static void exitSLSRunner() {
@@ -854,4 +702,8 @@ public class SLSRunner extends Configured implements Tool {
   public SynthTraceJobProducer getStjp() {
     return stjp;
   }
+
+  public AMSimulator getAMSimulatorByAppId(ApplicationId appId) {
+    return amRunner.getAMSimulator(appId);
+  }
 }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
index d284076..e46dea5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -44,15 +44,11 @@ public class MockAMLauncher extends ApplicationMasterLauncher
   private static final Logger LOG = LoggerFactory.getLogger(
       MockAMLauncher.class);
 
-  private Map<ApplicationId, AMSimulator> appIdAMSim;
+  private SLSRunner slsRunner;
 
-  SLSRunner se;
-
-  public MockAMLauncher(SLSRunner se, RMContext rmContext,
-      Map<ApplicationId, AMSimulator> appIdAMSim) {
+  public MockAMLauncher(SLSRunner slsRunner, RMContext rmContext) {
     super(rmContext);
-    this.appIdAMSim = appIdAMSim;
-    this.se = se;
+    this.slsRunner = slsRunner;
   }
 
   @Override
@@ -79,12 +75,11 @@ public class MockAMLauncher extends ApplicationMasterLauncher
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public void handle(AMLauncherEvent event) {
     ApplicationId appId =
         event.getAppAttempt().getAppAttemptId().getApplicationId();
     // find AMSimulator
-    AMSimulator ams = appIdAMSim.get(appId);
+    AMSimulator ams = slsRunner.getAMSimulatorByAppId(appId);
     if (ams == null) {
       throw new YarnRuntimeException(
           "Didn't find any AMSimulator for applicationId=" + appId);
@@ -103,7 +98,7 @@ public class MockAMLauncher extends ApplicationMasterLauncher
             event.getAppAttempt().getMasterContainer());
         LOG.info("Notify AM launcher launched:" + amContainer.getId());
 
-        se.getNmMap().get(amContainer.getNodeId())
+        slsRunner.getNmMap().get(amContainer.getNodeId())
             .addNewContainer(amContainer, -1, appId);
         ams.getRanNodes().add(amContainer.getNodeId());
         return;
@@ -111,7 +106,7 @@ public class MockAMLauncher extends ApplicationMasterLauncher
         throw new YarnRuntimeException(e);
       }
     case CLEANUP:
-      se.getNmMap().get(amContainer.getNodeId())
+      slsRunner.getNmMap().get(amContainer.getNodeId())
           .cleanupContainer(amContainer.getId());
       break;
     default:

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org