You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by qu...@apache.org on 2022/03/25 17:49:26 UTC
[hadoop] branch trunk updated: YARN-10548. Decouple AM runner logic from SLSRunner. Contributed by Szilard Nemeth.
This is an automated email from the ASF dual-hosted git repository.
quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 08a77a7 YARN-10548. Decouple AM runner logic from SLSRunner. Contributed by Szilard Nemeth.
08a77a7 is described below
commit 08a77a765ba635da1cc44f36b103116605a517ee
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Fri Mar 25 18:48:56 2022 +0100
YARN-10548. Decouple AM runner logic from SLSRunner. Contributed by Szilard Nemeth.
---
.../java/org/apache/hadoop/yarn/sls/AMRunner.java | 297 +++++++++++++++++++++
.../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 250 ++++-------------
.../yarn/sls/resourcemanager/MockAMLauncher.java | 17 +-
3 files changed, 354 insertions(+), 210 deletions(-)
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
new file mode 100644
index 0000000..da95c68
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.rumen.JobTraceReader;
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.SLSRunner.TraceType;
+import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class AMRunner {
+ private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class);
+ static int REMAINING_APPS = 0;
+
+ private final Configuration conf;
+ private int AM_ID;
+ private Map<String, AMSimulator> amMap;
+ private Map<ApplicationId, AMSimulator> appIdAMSim;
+ private Set<String> trackedApps;
+ private Map<String, Class> amClassMap;
+ private TraceType inputType;
+ private String[] inputTraces;
+ private SynthTraceJobProducer stjp;
+ private TaskRunner runner;
+ private SLSRunner slsRunner;
+ private int numAMs, numTasks;
+ private long maxRuntime;
+ private ResourceManager rm;
+
+ public AMRunner(TaskRunner runner, SLSRunner slsRunner) {
+ this.runner = runner;
+ this.slsRunner = slsRunner;
+ this.conf = slsRunner.getConf();
+ }
+
+
+ public void init(Configuration conf) throws ClassNotFoundException {
+ amMap = new ConcurrentHashMap<>();
+ amClassMap = new HashMap<>();
+ appIdAMSim = new ConcurrentHashMap<>();
+ // <AMType, Class> map
+ for (Map.Entry e : conf) {
+ String key = e.getKey().toString();
+ if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
+ String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
+ amClassMap.put(amType, Class.forName(conf.get(key)));
+ }
+ }
+ }
+
+ public void startAM() throws YarnException, IOException {
+ switch (inputType) {
+ case SLS:
+ for (String inputTrace : inputTraces) {
+ startAMFromSLSTrace(inputTrace);
+ }
+ break;
+ case RUMEN:
+ long baselineTimeMS = 0;
+ for (String inputTrace : inputTraces) {
+ startAMFromRumenTrace(inputTrace, baselineTimeMS);
+ }
+ break;
+ case SYNTH:
+ startAMFromSynthGenerator();
+ break;
+ default:
+ throw new YarnException("Input configuration not recognized, "
+ + "trace type should be SLS, RUMEN, or SYNTH");
+ }
+
+ numAMs = amMap.size();
+ REMAINING_APPS = numAMs;
+ }
+
+ /**
+ * Parse workload from a SLS trace file.
+ */
+ private void startAMFromSLSTrace(String inputTrace) throws IOException {
+ JsonFactory jsonF = new JsonFactory();
+ ObjectMapper mapper = new ObjectMapper();
+
+ try (Reader input = new InputStreamReader(
+ new FileInputStream(inputTrace), StandardCharsets.UTF_8)) {
+ JavaType type = mapper.getTypeFactory().
+ constructMapType(Map.class, String.class, String.class);
+ Iterator<Map<String, String>> jobIter = mapper.readValues(
+ jsonF.createParser(input), type);
+
+ while (jobIter.hasNext()) {
+ try {
+ Map<String, String> jsonJob = jobIter.next();
+ AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, slsRunner);
+ startAMs(amDef);
+ } catch (Exception e) {
+ LOG.error("Failed to create an AM: {}", e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * parse workload information from synth-generator trace files.
+ */
+ private void startAMFromSynthGenerator() throws YarnException, IOException {
+ Configuration localConf = new Configuration();
+ localConf.set("fs.defaultFS", "file:///");
+ // if we use the nodeFile this could have been not initialized yet.
+ if (stjp == null) {
+ stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0]));
+ }
+
+ SynthJob job;
+ // we use stjp, a reference to the job producer instantiated during node
+ // creation
+ while ((job = (SynthJob) stjp.getNextJob()) != null) {
+ ReservationId reservationId = null;
+ if (job.hasDeadline()) {
+ reservationId = ReservationId
+ .newInstance(rm.getStartTime(), AM_ID);
+ }
+ AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, slsRunner);
+ startAMs(amDef, reservationId, job.getParams(), job.getDeadline());
+ }
+ }
+
+ /**
+ * Parse workload from a rumen trace file.
+ */
+ private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
+ throws IOException {
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", "file:///");
+ File fin = new File(inputTrace);
+
+ try (JobTraceReader reader = new JobTraceReader(
+ new Path(fin.getAbsolutePath()), conf)) {
+ LoggedJob job = reader.getNext();
+
+ while (job != null) {
+ try {
+ AMDefinitionRumen amDef =
+ AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS,
+ slsRunner);
+ startAMs(amDef);
+ } catch (Exception e) {
+ LOG.error("Failed to create an AM", e);
+ }
+ job = reader.getNext();
+ }
+ }
+ }
+
+ private void startAMs(AMDefinition amDef) {
+ for (int i = 0; i < amDef.getJobCount(); i++) {
+ JobDefinition jobDef = JobDefinition.Builder.create()
+ .withAmDefinition(amDef)
+ .withDeadline(-1)
+ .withReservationId(null)
+ .withParams(null)
+ .build();
+ runNewAM(jobDef);
+ }
+ }
+
+ private void startAMs(AMDefinition amDef,
+ ReservationId reservationId,
+ Map<String, String> params, long deadline) {
+ for (int i = 0; i < amDef.getJobCount(); i++) {
+ JobDefinition jobDef = JobDefinition.Builder.create()
+ .withAmDefinition(amDef)
+ .withReservationId(reservationId)
+ .withParams(params)
+ .withDeadline(deadline)
+ .build();
+ runNewAM(jobDef);
+ }
+ }
+
+ private void runNewAM(JobDefinition jobDef) {
+ AMDefinition amDef = jobDef.getAmDefinition();
+ String oldJobId = amDef.getOldAppId();
+ AMSimulator amSim =
+ createAmSimulator(amDef.getAmType());
+
+ if (amSim != null) {
+ int heartbeatInterval = conf.getInt(
+ SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
+ SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+ boolean isTracked = trackedApps.contains(oldJobId);
+
+ if (oldJobId == null) {
+ oldJobId = Integer.toString(AM_ID);
+ }
+ AM_ID++;
+ amSim.init(amDef, rm, slsRunner, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim);
+ if (jobDef.getReservationId() != null) {
+ // if we have a ReservationId, delegate reservation creation to
+ // AMSim (reservation shape is impl specific)
+ UTCClock clock = new UTCClock();
+ amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(), clock.getTime());
+ }
+ runner.schedule(amSim);
+ maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime());
+ numTasks += amDef.getTaskContainers().size();
+ amMap.put(oldJobId, amSim);
+ }
+ }
+
+ private AMSimulator createAmSimulator(String jobType) {
+ return (AMSimulator) ReflectionUtils.newInstance(
+ amClassMap.get(jobType), new Configuration());
+ }
+
+ public AMSimulator getAMSimulator(ApplicationId appId) {
+ return appIdAMSim.get(appId);
+ }
+
+ public void setInputType(TraceType inputType) {
+ this.inputType = inputType;
+ }
+
+ public void setInputTraces(String[] inputTraces) {
+ this.inputTraces = inputTraces;
+ }
+
+ public void setResourceManager(ResourceManager rm) {
+ this.rm = rm;
+ }
+
+ public Set<String> getTrackedApps() {
+ return trackedApps;
+ }
+
+ public void setTrackedApps(Set<String> trackApps) {
+ this.trackedApps = trackApps;
+ }
+
+ public int getNumAMs() {
+ return numAMs;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ public long getMaxRuntime() {
+ return maxRuntime;
+ }
+
+ public Map<String, AMSimulator> getAmMap() {
+ return amMap;
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 260a600..48ad610 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -58,16 +58,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.TableMapping;
-import org.apache.hadoop.tools.rumen.JobTraceReader;
-import org.apache.hadoop.tools.rumen.LoggedJob;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -84,19 +80,32 @@ import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
-import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
-import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
-import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
-import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.security.Security;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
@Private
@Unstable
public class SLSRunner extends Configured implements Tool {
@@ -112,21 +121,12 @@ public class SLSRunner extends Configured implements Tool {
private Resource nodeManagerResource;
private String nodeFile;
- // AM simulator
- private int AM_ID;
- private Map<String, AMSimulator> amMap;
- private Map<ApplicationId, AMSimulator> appIdAMSim;
- private Set<String> trackedApps;
- private Map<String, Class> amClassMap;
- private static int remainingApps = 0;
-
// metrics
private String metricsOutputDir;
private boolean printSimulation;
// other simulation information
- private int numNMs, numRacks, numAMs, numTasks;
- private long maxRuntime;
+ private int numNMs, numRacks;
private String tableMapping;
private final static Map<String, Object> simulateInfoMap = new HashMap<>();
@@ -135,6 +135,7 @@ public class SLSRunner extends Configured implements Tool {
public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
private static boolean exitAtTheFinish = false;
+ private AMRunner amRunner;
/**
* The type of trace in input.
@@ -151,7 +152,7 @@ public class SLSRunner extends Configured implements Tool {
private SynthTraceJobProducer stjp;
public static int getRemainingApps() {
- return remainingApps;
+ return AMRunner.REMAINING_APPS;
}
public SLSRunner() throws ClassNotFoundException {
@@ -176,9 +177,7 @@ public class SLSRunner extends Configured implements Tool {
private void init(Configuration tempConf) throws ClassNotFoundException {
nmMap = new ConcurrentHashMap<>();
queueAppNumMap = new HashMap<>();
- amMap = new ConcurrentHashMap<>();
- amClassMap = new HashMap<>();
- appIdAMSim = new ConcurrentHashMap<>();
+ amRunner = new AMRunner(runner, this);
// runner configuration
setConf(tempConf);
@@ -186,15 +185,8 @@ public class SLSRunner extends Configured implements Tool {
poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
SLSRunner.runner.setQueueSize(poolSize);
- // <AMType, Class> map
- for (Map.Entry e : tempConf) {
- String key = e.getKey().toString();
- if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
- String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
- amClassMap.put(amType, Class.forName(tempConf.get(key)));
- }
- }
+ amRunner.init(tempConf);
nodeManagerResource = getNodeManagerResource();
}
@@ -227,14 +219,25 @@ public class SLSRunner extends Configured implements Tool {
return Collections.unmodifiableMap(simulateInfoMap);
}
+ /**
+ * This is invoked before start.
+ * @param inType
+ * @param inTraces
+ * @param nodes
+ * @param outDir
+ * @param trackApps
+ * @param printsimulation
+ */
public void setSimulationParams(TraceType inType, String[] inTraces,
String nodes, String outDir, Set<String> trackApps,
boolean printsimulation) {
this.inputType = inType;
this.inputTraces = inTraces.clone();
+ this.amRunner.setInputType(this.inputType);
+ this.amRunner.setInputTraces(this.inputTraces);
+ this.amRunner.setTrackedApps(trackApps);
this.nodeFile = nodes;
- this.trackedApps = trackApps;
this.printSimulation = printsimulation;
metricsOutputDir = outDir;
tableMapping = outDir + "/tableMapping.csv";
@@ -247,15 +250,16 @@ public class SLSRunner extends Configured implements Tool {
// start resource manager
startRM();
+ amRunner.setResourceManager(rm);
// start node managers
startNM();
// start application masters
- startAM();
+ amRunner.startAM();
// set queue & tracked apps information
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
.setQueueSet(this.queueAppNumMap.keySet());
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
- .setTrackedAppSet(this.trackedApps);
+ .setTrackedAppSet(amRunner.getTrackedApps());
// print out simulation info
printSimulationInfo();
// blocked until all nodes RUNNING
@@ -310,7 +314,7 @@ public class SLSRunner extends Configured implements Tool {
rm = new ResourceManager() {
@Override
protected ApplicationMasterLauncher createAMLauncher() {
- return new MockAMLauncher(se, this.rmContext, appIdAMSim);
+ return new MockAMLauncher(se, this.rmContext);
}
};
@@ -422,109 +426,6 @@ public class SLSRunner extends Configured implements Tool {
System.currentTimeMillis() - startTimeMS);
}
- private void startAM() throws YarnException, IOException {
- switch (inputType) {
- case SLS:
- for (String inputTrace : inputTraces) {
- startAMFromSLSTrace(inputTrace);
- }
- break;
- case RUMEN:
- long baselineTimeMS = 0;
- for (String inputTrace : inputTraces) {
- startAMFromRumenTrace(inputTrace, baselineTimeMS);
- }
- break;
- case SYNTH:
- startAMFromSynthGenerator();
- break;
- default:
- throw new YarnException("Input configuration not recognized, "
- + "trace type should be SLS, RUMEN, or SYNTH");
- }
-
- numAMs = amMap.size();
- remainingApps = numAMs;
- }
-
- /**
- * Parse workload from a SLS trace file.
- */
- private void startAMFromSLSTrace(String inputTrace) throws IOException {
- JsonFactory jsonF = new JsonFactory();
- ObjectMapper mapper = new ObjectMapper();
-
- try (Reader input = new InputStreamReader(
- new FileInputStream(inputTrace), StandardCharsets.UTF_8)) {
- JavaType type = mapper.getTypeFactory().
- constructMapType(Map.class, String.class, String.class);
- Iterator<Map<String, String>> jobIter = mapper.readValues(
- jsonF.createParser(input), type);
-
- while (jobIter.hasNext()) {
- try {
- Map<String, String> jsonJob = jobIter.next();
- AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, this);
- startAMs(amDef);
- } catch (Exception e) {
- LOG.error("Failed to create an AM: {}", e.getMessage());
- }
- }
- }
- }
-
- private void startAMs(AMDefinition amDef) {
- for (int i = 0; i < amDef.getJobCount(); i++) {
- JobDefinition jobDef = JobDefinition.Builder.create()
- .withAmDefinition(amDef)
- .withDeadline(-1)
- .withReservationId(null)
- .withParams(null)
- .build();
- runNewAM(jobDef);
- }
- }
-
- private void startAMs(AMDefinition amDef, ReservationId reservationId,
- Map<String, String> params, long deadline) {
- for (int i = 0; i < amDef.getJobCount(); i++) {
- JobDefinition jobDef = JobDefinition.Builder.create()
- .withAmDefinition(amDef)
- .withReservationId(reservationId)
- .withParams(params)
- .withDeadline(deadline)
- .build();
- runNewAM(jobDef);
- }
- }
-
- /**
- * Parse workload from a rumen trace file.
- */
- private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
- throws IOException {
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS", "file:///");
- File fin = new File(inputTrace);
-
- try (JobTraceReader reader = new JobTraceReader(
- new Path(fin.getAbsolutePath()), conf)) {
- LoggedJob job = reader.getNext();
-
- while (job != null) {
- try {
- AMDefinitionRumen amDef =
- AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS,
- this);
- startAMs(amDef);
- } catch (Exception e) {
- LOG.error("Failed to create an AM", e);
- }
- job = reader.getNext();
- }
- }
- }
-
Resource getDefaultContainerResource() {
int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
@@ -533,31 +434,6 @@ public class SLSRunner extends Configured implements Tool {
return Resources.createResource(containerMemory, containerVCores);
}
- /**
- * parse workload information from synth-generator trace files.
- */
- private void startAMFromSynthGenerator() throws YarnException, IOException {
- Configuration localConf = new Configuration();
- localConf.set("fs.defaultFS", "file:///");
- // if we use the nodeFile this could have been not initialized yet.
- if (stjp == null) {
- stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
- }
-
- SynthJob job;
- // we use stjp, a reference to the job producer instantiated during node
- // creation
- while ((job = (SynthJob) stjp.getNextJob()) != null) {
- ReservationId reservationId = null;
- if (job.hasDeadline()) {
- reservationId = ReservationId
- .newInstance(rm.getStartTime(), AM_ID);
- }
- AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, this);
- startAMs(amDef, reservationId, job.getParams(), job.getDeadline());
- }
- }
-
void increaseQueueAppNum(String queue) throws YarnException {
SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
String queueName = wrapper.getRealQueueName(queue);
@@ -575,43 +451,12 @@ public class SLSRunner extends Configured implements Tool {
}
}
- private AMSimulator createAmSimulator(String jobType) {
- return (AMSimulator) ReflectionUtils.newInstance(
- amClassMap.get(jobType), new Configuration());
- }
-
- private void runNewAM(JobDefinition jobDef) {
- AMDefinition amDef = jobDef.getAmDefinition();
- String oldJobId = amDef.getOldAppId();
- AMSimulator amSim =
- createAmSimulator(amDef.getAmType());
-
- if (amSim != null) {
- int heartbeatInterval = getConf().getInt(
- SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
- SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
- boolean isTracked = trackedApps.contains(oldJobId);
-
- if (oldJobId == null) {
- oldJobId = Integer.toString(AM_ID);
- }
- AM_ID++;
- amSim.init(amDef, rm, this, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim);
- if (jobDef.getReservationId() != null) {
- // if we have a ReservationId, delegate reservation creation to
- // AMSim (reservation shape is impl specific)
- UTCClock clock = new UTCClock();
- amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(),
- clock.getTime());
- }
- runner.schedule(amSim);
- maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime());
- numTasks += amDef.getTaskContainers().size();
- amMap.put(oldJobId, amSim);
- }
- }
-
private void printSimulationInfo() {
+ final int numAMs = amRunner.getNumAMs();
+ final int numTasks = amRunner.getNumTasks();
+ final long maxRuntime = amRunner.getMaxRuntime();
+ Map<String, AMSimulator> amMap = amRunner.getAmMap();
+
if (printSimulation) {
// node
LOG.info("------------------------------------");
@@ -663,7 +508,10 @@ public class SLSRunner extends Configured implements Tool {
}
public static void decreaseRemainingApps() {
- remainingApps--;
+ AMRunner.REMAINING_APPS--;
+ if (AMRunner.REMAINING_APPS == 0) {
+ exitSLSRunner();
+ }
}
public static void exitSLSRunner() {
@@ -854,4 +702,8 @@ public class SLSRunner extends Configured implements Tool {
public SynthTraceJobProducer getStjp() {
return stjp;
}
+
+ public AMSimulator getAMSimulatorByAppId(ApplicationId appId) {
+ return amRunner.getAMSimulator(appId);
+ }
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
index d284076..e46dea5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -44,15 +44,11 @@ public class MockAMLauncher extends ApplicationMasterLauncher
private static final Logger LOG = LoggerFactory.getLogger(
MockAMLauncher.class);
- private Map<ApplicationId, AMSimulator> appIdAMSim;
+ private SLSRunner slsRunner;
- SLSRunner se;
-
- public MockAMLauncher(SLSRunner se, RMContext rmContext,
- Map<ApplicationId, AMSimulator> appIdAMSim) {
+ public MockAMLauncher(SLSRunner slsRunner, RMContext rmContext) {
super(rmContext);
- this.appIdAMSim = appIdAMSim;
- this.se = se;
+ this.slsRunner = slsRunner;
}
@Override
@@ -79,12 +75,11 @@ public class MockAMLauncher extends ApplicationMasterLauncher
}
@Override
- @SuppressWarnings("unchecked")
public void handle(AMLauncherEvent event) {
ApplicationId appId =
event.getAppAttempt().getAppAttemptId().getApplicationId();
// find AMSimulator
- AMSimulator ams = appIdAMSim.get(appId);
+ AMSimulator ams = slsRunner.getAMSimulatorByAppId(appId);
if (ams == null) {
throw new YarnRuntimeException(
"Didn't find any AMSimulator for applicationId=" + appId);
@@ -103,7 +98,7 @@ public class MockAMLauncher extends ApplicationMasterLauncher
event.getAppAttempt().getMasterContainer());
LOG.info("Notify AM launcher launched:" + amContainer.getId());
- se.getNmMap().get(amContainer.getNodeId())
+ slsRunner.getNmMap().get(amContainer.getNodeId())
.addNewContainer(amContainer, -1, appId);
ams.getRanNodes().add(amContainer.getNodeId());
return;
@@ -111,7 +106,7 @@ public class MockAMLauncher extends ApplicationMasterLauncher
throw new YarnRuntimeException(e);
}
case CLEANUP:
- se.getNmMap().get(amContainer.getNodeId())
+ slsRunner.getNmMap().get(amContainer.getNodeId())
.cleanupContainer(amContainer.getId());
break;
default:
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org