You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bt...@apache.org on 2022/03/25 11:02:36 UTC

[hadoop] branch trunk updated: YARN-11094. Follow up changes for YARN-10547. Contributed by Szilard Nemeth

This is an automated email from the ASF dual-hosted git repository.

bteke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ffa0eab  YARN-11094. Follow up changes for YARN-10547. Contributed by Szilard Nemeth
ffa0eab is described below

commit ffa0eab48867ebbb84a3d015a941b0c08fe0b61a
Author: Benjamin Teke <bt...@cloudera.com>
AuthorDate: Fri Mar 25 12:01:44 2022 +0100

    YARN-11094. Follow up changes for YARN-10547. Contributed by Szilard Nemeth
---
 .../hadoop/yarn/sls/AMDefinitionFactory.java       |  6 ++--
 .../apache/hadoop/yarn/sls/AMDefinitionSLS.java    | 35 +++++++++---------
 .../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 35 +++++++++---------
 .../hadoop/yarn/sls/TaskContainerDefinition.java   | 41 +++++++++++-----------
 .../yarn/sls/scheduler/SLSSchedulerCommons.java    | 13 +++----
 5 files changed, 61 insertions(+), 69 deletions(-)

diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
index 2bbe7bb..61975f0 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
@@ -37,7 +37,7 @@ public final class AMDefinitionFactory {
 
   private AMDefinitionFactory() {}
 
-  public static AMDefinitionSLS createFromSlsTrace(Map<?, ?> jsonJob,
+  public static AMDefinitionSLS createFromSlsTrace(Map<String, String> jsonJob,
       SLSRunner slsRunner) throws YarnException {
     AMDefinitionSLS amDefinition = AMDefinitionSLS.Builder.create(jsonJob)
         .withAmType(SLSConfiguration.AM_TYPE)
@@ -94,7 +94,7 @@ public final class AMDefinitionFactory {
     return amDefinition;
   }
 
-  private static Resource getAMContainerResourceSLS(Map<?, ?> jsonJob,
+  private static Resource getAMContainerResourceSLS(Map<String, String> jsonJob,
       Configured configured) {
     Resource amContainerResource =
         SLSConfiguration.getAMContainerResource(configured.getConf());
@@ -106,7 +106,7 @@ public final class AMDefinitionFactory {
     for (ResourceInformation info : infors) {
       String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
       if (jsonJob.containsKey(key)) {
-        long value = Long.parseLong(jsonJob.get(key).toString());
+        long value = Long.parseLong(jsonJob.get(key));
         amContainerResource.setResourceValue(info.getName(), value);
       }
     }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
index 7439ddf..a84c924 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
@@ -36,19 +36,19 @@ public class AMDefinitionSLS extends AMDefinition {
     return queue;
   }
 
-  public static List<ContainerSimulator> getTaskContainers(Map<?, ?> jsonJob,
+  public static List<ContainerSimulator> getTaskContainers(Map<String, ?> jsonJob,
       SLSRunner slsRunner) throws YarnException {
-    List<Map<?, ?>> tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
+    List<Map<String, String>> tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
     if (tasks == null || tasks.size() == 0) {
       throw new YarnException("No task for the job!");
     }
 
     List<ContainerSimulator> containers = new ArrayList<>();
-    for (Map<?, ?> jsonTask : tasks) {
+    for (Map<String, String> jsonTask : tasks) {
       TaskContainerDefinition containerDef =
           TaskContainerDefinition.Builder.create()
               .withCount(jsonTask, SLSConfiguration.COUNT)
-              .withHostname((String) jsonTask.get(SLSConfiguration.TASK_HOST))
+              .withHostname(jsonTask.get(SLSConfiguration.TASK_HOST))
               .withDuration(jsonTask, SLSConfiguration.TASK_DURATION_MS)
               .withDurationLegacy(jsonTask, SLSConfiguration.DURATION_MS)
               .withTaskStart(jsonTask, SLSConfiguration.TASK_START_MS)
@@ -69,15 +69,14 @@ public class AMDefinitionSLS extends AMDefinition {
     return containers;
   }
 
-  private static Resource getResourceForContainer(Map<?, ?> jsonTask,
+  private static Resource getResourceForContainer(Map<String, String> jsonTask,
       SLSRunner slsRunner) {
     Resource res = slsRunner.getDefaultContainerResource();
     ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
     for (ResourceInformation info : infors) {
       if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
         long value = Long.parseLong(
-            jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
-                .toString());
+            jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName()));
         res.setResourceValue(info.getName(), value);
       }
     }
@@ -85,19 +84,19 @@ public class AMDefinitionSLS extends AMDefinition {
   }
 
   public static final class Builder extends AmDefinitionBuilder {
-    private final Map<?, ?> jsonJob;
+    private final Map<String, String> jsonJob;
 
-    private Builder(Map<?, ?> jsonJob) {
+    private Builder(Map<String, String> jsonJob) {
       this.jsonJob = jsonJob;
     }
 
-    public static Builder create(Map<?, ?> jsonJob) {
+    public static Builder create(Map<String, String> jsonJob) {
       return new Builder(jsonJob);
     }
 
     public Builder withAmType(String key) {
       if (jsonJob.containsKey(key)) {
-        String amType = (String) jsonJob.get(key);
+        String amType = jsonJob.get(key);
         if (amType != null) {
           this.amType = amType;
         }
@@ -107,7 +106,7 @@ public class AMDefinitionSLS extends AMDefinition {
 
     public Builder withUser(String key) {
       if (jsonJob.containsKey(key)) {
-        String user = (String) jsonJob.get(key);
+        String user = jsonJob.get(key);
         if (user != null) {
           this.user = user;
         }
@@ -117,21 +116,21 @@ public class AMDefinitionSLS extends AMDefinition {
 
     public Builder withQueue(String key) {
       if (jsonJob.containsKey(key)) {
-        this.queue = jsonJob.get(key).toString();
+        this.queue = jsonJob.get(key);
       }
       return this;
     }
 
     public Builder withJobId(String key) {
       if (jsonJob.containsKey(key)) {
-        this.jobId = (String) jsonJob.get(key);
+        this.jobId = jsonJob.get(key);
       }
       return this;
     }
 
     public Builder withJobCount(String key) {
       if (jsonJob.containsKey(key)) {
-        jobCount = Integer.parseInt(jsonJob.get(key).toString());
+        jobCount = Integer.parseInt(jsonJob.get(key));
         jobCount = Math.max(jobCount, 1);
       }
       return this;
@@ -139,21 +138,21 @@ public class AMDefinitionSLS extends AMDefinition {
 
     public Builder withJobStartTime(String key) {
       if (jsonJob.containsKey(key)) {
-        this.jobStartTime = Long.parseLong(jsonJob.get(key).toString());
+        this.jobStartTime = Long.parseLong(jsonJob.get(key));
       }
       return this;
     }
 
     public Builder withJobFinishTime(String key) {
       if (jsonJob.containsKey(key)) {
-        this.jobFinishTime = Long.parseLong(jsonJob.get(key).toString());
+        this.jobFinishTime = Long.parseLong(jsonJob.get(key));
       }
       return this;
     }
 
     public Builder withLabelExpression(String key) {
       if (jsonJob.containsKey(key)) {
-        this.labelExpression = jsonJob.get(key).toString();
+        this.labelExpression = jsonJob.get(key);
       }
       return this;
     }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 83834e8..260a600 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -22,12 +22,14 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
+import java.nio.charset.StandardCharsets;
 import java.security.Security;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.Collections;
@@ -37,6 +39,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.commons.cli.CommandLine;
@@ -126,8 +129,7 @@ public class SLSRunner extends Configured implements Tool {
   private long maxRuntime;
   private String tableMapping;
 
-  private final static Map<String, Object> simulateInfoMap =
-          new HashMap<String, Object>();
+  private final static Map<String, Object> simulateInfoMap = new HashMap<>();
 
   // logger
   public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
@@ -227,7 +229,7 @@ public class SLSRunner extends Configured implements Tool {
 
   public void setSimulationParams(TraceType inType, String[] inTraces,
       String nodes, String outDir, Set<String> trackApps,
-      boolean printsimulation) throws IOException, ClassNotFoundException {
+      boolean printsimulation) {
 
     this.inputType = inType;
     this.inputTraces = inTraces.clone();
@@ -420,7 +422,6 @@ public class SLSRunner extends Configured implements Tool {
         System.currentTimeMillis() - startTimeMS);
   }
 
-  @SuppressWarnings("unchecked")
   private void startAM() throws YarnException, IOException {
     switch (inputType) {
     case SLS:
@@ -449,21 +450,21 @@ public class SLSRunner extends Configured implements Tool {
   /**
    * Parse workload from a SLS trace file.
    */
-  @SuppressWarnings("unchecked")
   private void startAMFromSLSTrace(String inputTrace) throws IOException {
     JsonFactory jsonF = new JsonFactory();
     ObjectMapper mapper = new ObjectMapper();
 
     try (Reader input = new InputStreamReader(
-        new FileInputStream(inputTrace), "UTF-8")) {
-      Iterator<Map> jobIter = mapper.readValues(
-          jsonF.createParser(input), Map.class);
+        new FileInputStream(inputTrace), StandardCharsets.UTF_8)) {
+      JavaType type = mapper.getTypeFactory().
+          constructMapType(Map.class, String.class, String.class);
+      Iterator<Map<String, String>> jobIter = mapper.readValues(
+          jsonF.createParser(input), type);
 
       while (jobIter.hasNext()) {
         try {
-          Map jsonJob = jobIter.next();
-          AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(
-              jsonJob, this);
+          Map<String, String> jsonJob = jobIter.next();
+          AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, this);
           startAMs(amDef);
         } catch (Exception e) {
           LOG.error("Failed to create an AM: {}", e.getMessage());
@@ -500,7 +501,6 @@ public class SLSRunner extends Configured implements Tool {
   /**
    * Parse workload from a rumen trace file.
    */
-  @SuppressWarnings("unchecked")
   private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
       throws IOException {
     Configuration conf = new Configuration();
@@ -536,7 +536,6 @@ public class SLSRunner extends Configured implements Tool {
   /**
    * parse workload information from synth-generator trace files.
    */
-  @SuppressWarnings("unchecked")
   private void startAMFromSynthGenerator() throws YarnException, IOException {
     Configuration localConf = new Configuration();
     localConf.set("fs.defaultFS", "file:///");
@@ -729,17 +728,17 @@ public class SLSRunner extends Configured implements Tool {
       throw new YarnException("Cannot create output directory");
     }
 
-    Set<String> trackedJobSet = new HashSet<String>();
+    Set<String> trackedJobSet = new HashSet<>();
     if (cmd.hasOption("trackjobs")) {
       String trackjobs = cmd.getOptionValue("trackjobs");
-      String jobIds[] = trackjobs.split(",");
+      String[] jobIds = trackjobs.split(",");
       trackedJobSet.addAll(Arrays.asList(jobIds));
     }
 
     String tempNodeFile =
         cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
 
-    TraceType tempTraceType = TraceType.SLS;
+    TraceType tempTraceType;
     switch (traceType) {
     case "SLS":
       tempTraceType = TraceType.SLS;
@@ -834,9 +833,7 @@ public class SLSRunner extends Configured implements Tool {
       NodeDetails that = (NodeDetails) o;
 
       return StringUtils.equals(hostname, that.hostname) && (
-          nodeResource == null ?
-              that.nodeResource == null :
-              nodeResource.equals(that.nodeResource)) && SetUtils
+          Objects.equals(nodeResource, that.nodeResource)) && SetUtils
           .isEqualSet(labels, that.labels);
     }
 
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
index 1b0cd90..04bea3a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
@@ -88,9 +88,9 @@ public class TaskContainerDefinition {
       return new Builder();
     }
 
-    public Builder withDuration(Map<?, ?> jsonTask, String key) {
+    public Builder withDuration(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        this.duration = Integer.parseInt(jsonTask.get(key).toString());
+        this.duration = Integer.parseInt(jsonTask.get(key));
       }
       return this;
     }
@@ -106,23 +106,23 @@ public class TaskContainerDefinition {
      * @param key The json key.
      * @return the builder
      */
-    public Builder withDurationLegacy(Map<?, ?> jsonTask, String key) {
+    public Builder withDurationLegacy(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        this.durationLegacy = Integer.parseInt(jsonTask.get(key).toString());
+        this.durationLegacy = Integer.parseInt(jsonTask.get(key));
       }
       return this;
     }
 
-    public Builder withTaskStart(Map<?, ?> jsonTask, String key) {
+    public Builder withTaskStart(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        this.taskStart = Long.parseLong(jsonTask.get(key).toString());
+        this.taskStart = Long.parseLong(jsonTask.get(key));
       }
       return this;
     }
 
-    public Builder withTaskFinish(Map<?, ?> jsonTask, String key) {
+    public Builder withTaskFinish(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        this.taskFinish = Long.parseLong(jsonTask.get(key).toString());
+        this.taskFinish = Long.parseLong(jsonTask.get(key));
       }
       return this;
     }
@@ -132,9 +132,9 @@ public class TaskContainerDefinition {
       return this;
     }
 
-    public Builder withPriority(Map<?, ?> jsonTask, String key) {
+    public Builder withPriority(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        this.priority = Integer.parseInt(jsonTask.get(key).toString());
+        this.priority = Integer.parseInt(jsonTask.get(key));
       }
       return this;
     }
@@ -144,9 +144,9 @@ public class TaskContainerDefinition {
       return this;
     }
 
-    public Builder withType(Map<?, ?> jsonTask, String key) {
+    public Builder withType(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        this.type = jsonTask.get(key).toString();
+        this.type = jsonTask.get(key);
       }
       return this;
     }
@@ -156,9 +156,9 @@ public class TaskContainerDefinition {
       return this;
     }
 
-    public Builder withCount(Map<?, ?> jsonTask, String key) {
+    public Builder withCount(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        count = Integer.parseInt(jsonTask.get(key).toString());
+        count = Integer.parseInt(jsonTask.get(key));
         count = Math.max(count, 1);
       }
       return this;
@@ -169,10 +169,9 @@ public class TaskContainerDefinition {
       return this;
     }
 
-    public Builder withExecutionType(Map<?, ?> jsonTask, String key) {
+    public Builder withExecutionType(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        this.executionType = ExecutionType.valueOf(
-            jsonTask.get(key).toString());
+        this.executionType = ExecutionType.valueOf(jsonTask.get(key));
       }
       return this;
     }
@@ -182,9 +181,9 @@ public class TaskContainerDefinition {
       return this;
     }
 
-    public Builder withAllocationId(Map<?, ?> jsonTask, String key) {
+    public Builder withAllocationId(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        this.allocationId = Long.parseLong(jsonTask.get(key).toString());
+        this.allocationId = Long.parseLong(jsonTask.get(key));
       }
       return this;
     }
@@ -194,9 +193,9 @@ public class TaskContainerDefinition {
       return this;
     }
 
-    public Builder withRequestDelay(Map<?, ?> jsonTask, String key) {
+    public Builder withRequestDelay(Map<String, String> jsonTask, String key) {
       if (jsonTask.containsKey(key)) {
-        requestDelay = Long.parseLong(jsonTask.get(key).toString());
+        requestDelay = Long.parseLong(jsonTask.get(key));
         requestDelay = Math.max(requestDelay, 0);
       }
       return this;
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
index 92aa960..7132fc9 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
@@ -54,22 +54,19 @@ import java.util.concurrent.ConcurrentHashMap;
 public class SLSSchedulerCommons {
   private static final Logger LOG = LoggerFactory.getLogger(SLSSchedulerCommons.class);
 
-  private AbstractYarnScheduler scheduler;
+  private final AbstractYarnScheduler<?, ?> scheduler;
   private boolean metricsON;
   private SchedulerMetrics schedulerMetrics;
-  private Map<ContainerId, Resource> preemptionContainerMap =
-      new ConcurrentHashMap<>();
-
-  private Map<ApplicationAttemptId, String> appQueueMap =
-      new ConcurrentHashMap<>();
-  private Tracker tracker;
+  private final Map<ContainerId, Resource> preemptionContainerMap = new ConcurrentHashMap<>();
+  private final Map<ApplicationAttemptId, String> appQueueMap = new ConcurrentHashMap<>();
+  private final Tracker tracker;
   
   public SLSSchedulerCommons(AbstractYarnScheduler scheduler) {
     this.scheduler = scheduler;
     this.tracker = new Tracker();
   }
 
-  public void initMetrics(Class<? extends AbstractYarnScheduler> schedulerClass, Configuration conf) {
+  public void initMetrics(Class<? extends AbstractYarnScheduler<?, ?>> schedulerClass, Configuration conf) {
     metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
     if (metricsON) {
       try {

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org