You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bt...@apache.org on 2022/03/25 11:02:36 UTC
[hadoop] branch trunk updated: YARN-11094. Follow up changes for YARN-10547. Contributed by Szilard Nemeth
This is an automated email from the ASF dual-hosted git repository.
bteke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new ffa0eab YARN-11094. Follow up changes for YARN-10547. Contributed by Szilard Nemeth
ffa0eab is described below
commit ffa0eab48867ebbb84a3d015a941b0c08fe0b61a
Author: Benjamin Teke <bt...@cloudera.com>
AuthorDate: Fri Mar 25 12:01:44 2022 +0100
YARN-11094. Follow up changes for YARN-10547. Contributed by Szilard Nemeth
---
.../hadoop/yarn/sls/AMDefinitionFactory.java | 6 ++--
.../apache/hadoop/yarn/sls/AMDefinitionSLS.java | 35 +++++++++---------
.../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 35 +++++++++---------
.../hadoop/yarn/sls/TaskContainerDefinition.java | 41 +++++++++++-----------
.../yarn/sls/scheduler/SLSSchedulerCommons.java | 13 +++----
5 files changed, 61 insertions(+), 69 deletions(-)
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
index 2bbe7bb..61975f0 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
@@ -37,7 +37,7 @@ public final class AMDefinitionFactory {
private AMDefinitionFactory() {}
- public static AMDefinitionSLS createFromSlsTrace(Map<?, ?> jsonJob,
+ public static AMDefinitionSLS createFromSlsTrace(Map<String, String> jsonJob,
SLSRunner slsRunner) throws YarnException {
AMDefinitionSLS amDefinition = AMDefinitionSLS.Builder.create(jsonJob)
.withAmType(SLSConfiguration.AM_TYPE)
@@ -94,7 +94,7 @@ public final class AMDefinitionFactory {
return amDefinition;
}
- private static Resource getAMContainerResourceSLS(Map<?, ?> jsonJob,
+ private static Resource getAMContainerResourceSLS(Map<String, String> jsonJob,
Configured configured) {
Resource amContainerResource =
SLSConfiguration.getAMContainerResource(configured.getConf());
@@ -106,7 +106,7 @@ public final class AMDefinitionFactory {
for (ResourceInformation info : infors) {
String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
if (jsonJob.containsKey(key)) {
- long value = Long.parseLong(jsonJob.get(key).toString());
+ long value = Long.parseLong(jsonJob.get(key));
amContainerResource.setResourceValue(info.getName(), value);
}
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
index 7439ddf..a84c924 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
@@ -36,19 +36,19 @@ public class AMDefinitionSLS extends AMDefinition {
return queue;
}
- public static List<ContainerSimulator> getTaskContainers(Map<?, ?> jsonJob,
+ public static List<ContainerSimulator> getTaskContainers(Map<String, ?> jsonJob,
SLSRunner slsRunner) throws YarnException {
- List<Map<?, ?>> tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
+ List<Map<String, String>> tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
if (tasks == null || tasks.size() == 0) {
throw new YarnException("No task for the job!");
}
List<ContainerSimulator> containers = new ArrayList<>();
- for (Map<?, ?> jsonTask : tasks) {
+ for (Map<String, String> jsonTask : tasks) {
TaskContainerDefinition containerDef =
TaskContainerDefinition.Builder.create()
.withCount(jsonTask, SLSConfiguration.COUNT)
- .withHostname((String) jsonTask.get(SLSConfiguration.TASK_HOST))
+ .withHostname(jsonTask.get(SLSConfiguration.TASK_HOST))
.withDuration(jsonTask, SLSConfiguration.TASK_DURATION_MS)
.withDurationLegacy(jsonTask, SLSConfiguration.DURATION_MS)
.withTaskStart(jsonTask, SLSConfiguration.TASK_START_MS)
@@ -69,15 +69,14 @@ public class AMDefinitionSLS extends AMDefinition {
return containers;
}
- private static Resource getResourceForContainer(Map<?, ?> jsonTask,
+ private static Resource getResourceForContainer(Map<String, String> jsonTask,
SLSRunner slsRunner) {
Resource res = slsRunner.getDefaultContainerResource();
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
for (ResourceInformation info : infors) {
if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
long value = Long.parseLong(
- jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
- .toString());
+ jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName()));
res.setResourceValue(info.getName(), value);
}
}
@@ -85,19 +84,19 @@ public class AMDefinitionSLS extends AMDefinition {
}
public static final class Builder extends AmDefinitionBuilder {
- private final Map<?, ?> jsonJob;
+ private final Map<String, String> jsonJob;
- private Builder(Map<?, ?> jsonJob) {
+ private Builder(Map<String, String> jsonJob) {
this.jsonJob = jsonJob;
}
- public static Builder create(Map<?, ?> jsonJob) {
+ public static Builder create(Map<String, String> jsonJob) {
return new Builder(jsonJob);
}
public Builder withAmType(String key) {
if (jsonJob.containsKey(key)) {
- String amType = (String) jsonJob.get(key);
+ String amType = jsonJob.get(key);
if (amType != null) {
this.amType = amType;
}
@@ -107,7 +106,7 @@ public class AMDefinitionSLS extends AMDefinition {
public Builder withUser(String key) {
if (jsonJob.containsKey(key)) {
- String user = (String) jsonJob.get(key);
+ String user = jsonJob.get(key);
if (user != null) {
this.user = user;
}
@@ -117,21 +116,21 @@ public class AMDefinitionSLS extends AMDefinition {
public Builder withQueue(String key) {
if (jsonJob.containsKey(key)) {
- this.queue = jsonJob.get(key).toString();
+ this.queue = jsonJob.get(key);
}
return this;
}
public Builder withJobId(String key) {
if (jsonJob.containsKey(key)) {
- this.jobId = (String) jsonJob.get(key);
+ this.jobId = jsonJob.get(key);
}
return this;
}
public Builder withJobCount(String key) {
if (jsonJob.containsKey(key)) {
- jobCount = Integer.parseInt(jsonJob.get(key).toString());
+ jobCount = Integer.parseInt(jsonJob.get(key));
jobCount = Math.max(jobCount, 1);
}
return this;
@@ -139,21 +138,21 @@ public class AMDefinitionSLS extends AMDefinition {
public Builder withJobStartTime(String key) {
if (jsonJob.containsKey(key)) {
- this.jobStartTime = Long.parseLong(jsonJob.get(key).toString());
+ this.jobStartTime = Long.parseLong(jsonJob.get(key));
}
return this;
}
public Builder withJobFinishTime(String key) {
if (jsonJob.containsKey(key)) {
- this.jobFinishTime = Long.parseLong(jsonJob.get(key).toString());
+ this.jobFinishTime = Long.parseLong(jsonJob.get(key));
}
return this;
}
public Builder withLabelExpression(String key) {
if (jsonJob.containsKey(key)) {
- this.labelExpression = jsonJob.get(key).toString();
+ this.labelExpression = jsonJob.get(key);
}
return this;
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 83834e8..260a600 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -22,12 +22,14 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
+import java.nio.charset.StandardCharsets;
import java.security.Security;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.Collections;
@@ -37,6 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.CommandLine;
@@ -126,8 +129,7 @@ public class SLSRunner extends Configured implements Tool {
private long maxRuntime;
private String tableMapping;
- private final static Map<String, Object> simulateInfoMap =
- new HashMap<String, Object>();
+ private final static Map<String, Object> simulateInfoMap = new HashMap<>();
// logger
public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
@@ -227,7 +229,7 @@ public class SLSRunner extends Configured implements Tool {
public void setSimulationParams(TraceType inType, String[] inTraces,
String nodes, String outDir, Set<String> trackApps,
- boolean printsimulation) throws IOException, ClassNotFoundException {
+ boolean printsimulation) {
this.inputType = inType;
this.inputTraces = inTraces.clone();
@@ -420,7 +422,6 @@ public class SLSRunner extends Configured implements Tool {
System.currentTimeMillis() - startTimeMS);
}
- @SuppressWarnings("unchecked")
private void startAM() throws YarnException, IOException {
switch (inputType) {
case SLS:
@@ -449,21 +450,21 @@ public class SLSRunner extends Configured implements Tool {
/**
* Parse workload from a SLS trace file.
*/
- @SuppressWarnings("unchecked")
private void startAMFromSLSTrace(String inputTrace) throws IOException {
JsonFactory jsonF = new JsonFactory();
ObjectMapper mapper = new ObjectMapper();
try (Reader input = new InputStreamReader(
- new FileInputStream(inputTrace), "UTF-8")) {
- Iterator<Map> jobIter = mapper.readValues(
- jsonF.createParser(input), Map.class);
+ new FileInputStream(inputTrace), StandardCharsets.UTF_8)) {
+ JavaType type = mapper.getTypeFactory().
+ constructMapType(Map.class, String.class, String.class);
+ Iterator<Map<String, String>> jobIter = mapper.readValues(
+ jsonF.createParser(input), type);
while (jobIter.hasNext()) {
try {
- Map jsonJob = jobIter.next();
- AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(
- jsonJob, this);
+ Map<String, String> jsonJob = jobIter.next();
+ AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, this);
startAMs(amDef);
} catch (Exception e) {
LOG.error("Failed to create an AM: {}", e.getMessage());
@@ -500,7 +501,6 @@ public class SLSRunner extends Configured implements Tool {
/**
* Parse workload from a rumen trace file.
*/
- @SuppressWarnings("unchecked")
private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
throws IOException {
Configuration conf = new Configuration();
@@ -536,7 +536,6 @@ public class SLSRunner extends Configured implements Tool {
/**
* parse workload information from synth-generator trace files.
*/
- @SuppressWarnings("unchecked")
private void startAMFromSynthGenerator() throws YarnException, IOException {
Configuration localConf = new Configuration();
localConf.set("fs.defaultFS", "file:///");
@@ -729,17 +728,17 @@ public class SLSRunner extends Configured implements Tool {
throw new YarnException("Cannot create output directory");
}
- Set<String> trackedJobSet = new HashSet<String>();
+ Set<String> trackedJobSet = new HashSet<>();
if (cmd.hasOption("trackjobs")) {
String trackjobs = cmd.getOptionValue("trackjobs");
- String jobIds[] = trackjobs.split(",");
+ String[] jobIds = trackjobs.split(",");
trackedJobSet.addAll(Arrays.asList(jobIds));
}
String tempNodeFile =
cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
- TraceType tempTraceType = TraceType.SLS;
+ TraceType tempTraceType;
switch (traceType) {
case "SLS":
tempTraceType = TraceType.SLS;
@@ -834,9 +833,7 @@ public class SLSRunner extends Configured implements Tool {
NodeDetails that = (NodeDetails) o;
return StringUtils.equals(hostname, that.hostname) && (
- nodeResource == null ?
- that.nodeResource == null :
- nodeResource.equals(that.nodeResource)) && SetUtils
+ Objects.equals(nodeResource, that.nodeResource)) && SetUtils
.isEqualSet(labels, that.labels);
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
index 1b0cd90..04bea3a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
@@ -88,9 +88,9 @@ public class TaskContainerDefinition {
return new Builder();
}
- public Builder withDuration(Map<?, ?> jsonTask, String key) {
+ public Builder withDuration(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- this.duration = Integer.parseInt(jsonTask.get(key).toString());
+ this.duration = Integer.parseInt(jsonTask.get(key));
}
return this;
}
@@ -106,23 +106,23 @@ public class TaskContainerDefinition {
* @param key The json key.
* @return the builder
*/
- public Builder withDurationLegacy(Map<?, ?> jsonTask, String key) {
+ public Builder withDurationLegacy(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- this.durationLegacy = Integer.parseInt(jsonTask.get(key).toString());
+ this.durationLegacy = Integer.parseInt(jsonTask.get(key));
}
return this;
}
- public Builder withTaskStart(Map<?, ?> jsonTask, String key) {
+ public Builder withTaskStart(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- this.taskStart = Long.parseLong(jsonTask.get(key).toString());
+ this.taskStart = Long.parseLong(jsonTask.get(key));
}
return this;
}
- public Builder withTaskFinish(Map<?, ?> jsonTask, String key) {
+ public Builder withTaskFinish(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- this.taskFinish = Long.parseLong(jsonTask.get(key).toString());
+ this.taskFinish = Long.parseLong(jsonTask.get(key));
}
return this;
}
@@ -132,9 +132,9 @@ public class TaskContainerDefinition {
return this;
}
- public Builder withPriority(Map<?, ?> jsonTask, String key) {
+ public Builder withPriority(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- this.priority = Integer.parseInt(jsonTask.get(key).toString());
+ this.priority = Integer.parseInt(jsonTask.get(key));
}
return this;
}
@@ -144,9 +144,9 @@ public class TaskContainerDefinition {
return this;
}
- public Builder withType(Map<?, ?> jsonTask, String key) {
+ public Builder withType(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- this.type = jsonTask.get(key).toString();
+ this.type = jsonTask.get(key);
}
return this;
}
@@ -156,9 +156,9 @@ public class TaskContainerDefinition {
return this;
}
- public Builder withCount(Map<?, ?> jsonTask, String key) {
+ public Builder withCount(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- count = Integer.parseInt(jsonTask.get(key).toString());
+ count = Integer.parseInt(jsonTask.get(key));
count = Math.max(count, 1);
}
return this;
@@ -169,10 +169,9 @@ public class TaskContainerDefinition {
return this;
}
- public Builder withExecutionType(Map<?, ?> jsonTask, String key) {
+ public Builder withExecutionType(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- this.executionType = ExecutionType.valueOf(
- jsonTask.get(key).toString());
+ this.executionType = ExecutionType.valueOf(jsonTask.get(key));
}
return this;
}
@@ -182,9 +181,9 @@ public class TaskContainerDefinition {
return this;
}
- public Builder withAllocationId(Map<?, ?> jsonTask, String key) {
+ public Builder withAllocationId(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- this.allocationId = Long.parseLong(jsonTask.get(key).toString());
+ this.allocationId = Long.parseLong(jsonTask.get(key));
}
return this;
}
@@ -194,9 +193,9 @@ public class TaskContainerDefinition {
return this;
}
- public Builder withRequestDelay(Map<?, ?> jsonTask, String key) {
+ public Builder withRequestDelay(Map<String, String> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
- requestDelay = Long.parseLong(jsonTask.get(key).toString());
+ requestDelay = Long.parseLong(jsonTask.get(key));
requestDelay = Math.max(requestDelay, 0);
}
return this;
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
index 92aa960..7132fc9 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
@@ -54,22 +54,19 @@ import java.util.concurrent.ConcurrentHashMap;
public class SLSSchedulerCommons {
private static final Logger LOG = LoggerFactory.getLogger(SLSSchedulerCommons.class);
- private AbstractYarnScheduler scheduler;
+ private final AbstractYarnScheduler<?, ?> scheduler;
private boolean metricsON;
private SchedulerMetrics schedulerMetrics;
- private Map<ContainerId, Resource> preemptionContainerMap =
- new ConcurrentHashMap<>();
-
- private Map<ApplicationAttemptId, String> appQueueMap =
- new ConcurrentHashMap<>();
- private Tracker tracker;
+ private final Map<ContainerId, Resource> preemptionContainerMap = new ConcurrentHashMap<>();
+ private final Map<ApplicationAttemptId, String> appQueueMap = new ConcurrentHashMap<>();
+ private final Tracker tracker;
public SLSSchedulerCommons(AbstractYarnScheduler scheduler) {
this.scheduler = scheduler;
this.tracker = new Tracker();
}
- public void initMetrics(Class<? extends AbstractYarnScheduler> schedulerClass, Configuration conf) {
+ public void initMetrics(Class<? extends AbstractYarnScheduler<?, ?>> schedulerClass, Configuration conf) {
metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
if (metricsON) {
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org