You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/05/09 05:47:17 UTC

[34/50] [abbrv] hadoop git commit: YARN-6522. Make SLS JSON input file format simple and scalable (yufeigu via rkanter)

YARN-6522. Make SLS JSON input file format simple and scalable (yufeigu via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3082552b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3082552b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3082552b

Branch: refs/heads/HDFS-7240
Commit: 3082552b3b991df846caf572b58e44308ddf8eeb
Parents: 07761af
Author: Robert Kanter <rk...@apache.org>
Authored: Thu May 4 17:21:46 2017 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Thu May 4 17:21:46 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/yarn/sls/SLSRunner.java   | 102 ++++++++++++++-----
 .../hadoop/yarn/sls/appmaster/AMSimulator.java  |  42 ++++----
 .../sls/synthetic/SynthTraceJobProducer.java    |   2 +-
 .../apache/hadoop/yarn/sls/utils/SLSUtils.java  |  49 ++++++---
 .../src/site/markdown/SchedulerLoadSimulator.md |  28 +++--
 .../hadoop/yarn/sls/utils/TestSLSUtils.java     |  30 ++++++
 6 files changed, 182 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 9d35d1b..ddd35ef 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -119,6 +119,9 @@ public class SLSRunner extends Configured implements Tool {
   // logger
   public final static Logger LOG = Logger.getLogger(SLSRunner.class);
 
+  private final static int DEFAULT_MAPPER_PRIORITY = 20;
+  private final static int DEFAULT_REDUCER_PRIORITY = 10;
+
   /**
    * The type of trace in input.
    */
@@ -247,8 +250,8 @@ public class SLSRunner extends Configured implements Tool {
           break;
         case SYNTH:
           stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
-          nodeSet.addAll(SLSUtils.generateNodesFromSynth(stjp.getNumNodes(),
-              stjp.getNodesPerRack()));
+          nodeSet.addAll(SLSUtils.generateNodes(stjp.getNumNodes(),
+              stjp.getNumNodes()/stjp.getNodesPerRack()));
           break;
         default:
           throw new YarnException("Input configuration not recognized, "
@@ -259,6 +262,10 @@ public class SLSRunner extends Configured implements Tool {
       nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile));
     }
 
+    if (nodeSet.size() == 0) {
+      throw new YarnException("No node! Please configure nodes.");
+    }
+
     // create NM simulators
     Random random = new Random();
     Set<String> rackSet = new HashSet<String>();
@@ -348,7 +355,11 @@ public class SLSRunner extends Configured implements Tool {
 
   private void createAMForJob(Map jsonJob) throws YarnException {
     long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString());
-    long jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
+
+    long jobFinishTime = 0;
+    if (jsonJob.containsKey("job.end.ms")) {
+      jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
+    }
 
     String user = (String) jsonJob.get("job.user");
     if (user == null) {
@@ -358,25 +369,49 @@ public class SLSRunner extends Configured implements Tool {
     String queue = jsonJob.get("job.queue.name").toString();
     increaseQueueAppNum(queue);
 
-    String oldAppId = jsonJob.get("job.id").toString();
+    String oldAppId = (String)jsonJob.get("job.id");
+    if (oldAppId == null) {
+      oldAppId = Integer.toString(AM_ID);
+    }
 
-    // tasks
+    String amType = (String)jsonJob.get("am.type");
+    if (amType == null) {
+      amType = SLSUtils.DEFAULT_JOB_TYPE;
+    }
+
+    runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
+        getTaskContainers(jsonJob), null);
+  }
+
+  private List<ContainerSimulator> getTaskContainers(Map jsonJob)
+      throws YarnException {
+    List<ContainerSimulator> containers = new ArrayList<>();
     List tasks = (List) jsonJob.get("job.tasks");
     if (tasks == null || tasks.size() == 0) {
       throw new YarnException("No task for the job!");
     }
 
-    List<ContainerSimulator> containerList = new ArrayList<>();
     for (Object o : tasks) {
       Map jsonTask = (Map) o;
-      String hostname = jsonTask.get("container.host").toString();
-      long taskStart = Long.parseLong(jsonTask.get("container.start.ms")
-          .toString());
-      long taskFinish = Long.parseLong(jsonTask.get("container.end.ms")
-          .toString());
-      long lifeTime = taskFinish - taskStart;
-
-      // Set memory and vcores from job trace file
+
+      String hostname = (String) jsonTask.get("container.host");
+
+      long duration = 0;
+      if (jsonTask.containsKey("duration.ms")) {
+        duration = Integer.parseInt(jsonTask.get("duration.ms").toString());
+      } else if (jsonTask.containsKey("container.start.ms") &&
+          jsonTask.containsKey("container.end.ms")) {
+        long taskStart = Long.parseLong(jsonTask.get("container.start.ms")
+            .toString());
+        long taskFinish = Long.parseLong(jsonTask.get("container.end.ms")
+            .toString());
+        duration = taskFinish - taskStart;
+      }
+      if (duration <= 0) {
+        throw new YarnException("Duration of a task shouldn't be less or equal"
+            + " to 0!");
+      }
+
       Resource res = getDefaultContainerResource();
       if (jsonTask.containsKey("container.memory")) {
         int containerMemory =
@@ -390,17 +425,30 @@ public class SLSRunner extends Configured implements Tool {
         res.setVirtualCores(containerVCores);
       }
 
-      int priority = Integer.parseInt(jsonTask.get("container.priority")
-          .toString());
-      String type = jsonTask.get("container.type").toString();
-      containerList.add(
-          new ContainerSimulator(res, lifeTime, hostname, priority, type));
+      int priority = DEFAULT_MAPPER_PRIORITY;
+      if (jsonTask.containsKey("container.priority")) {
+        priority = Integer.parseInt(jsonTask.get("container.priority")
+            .toString());
+      }
+
+      String type = "map";
+      if (jsonTask.containsKey("container.type")) {
+        type = jsonTask.get("container.type").toString();
+      }
+
+      int count = 1;
+      if (jsonTask.containsKey("count")) {
+        count = Integer.parseInt(jsonTask.get("count").toString());
+      }
+      count = Math.max(count, 1);
+
+      for (int i = 0; i < count; i++) {
+        containers.add(
+            new ContainerSimulator(res, duration, hostname, priority, type));
+      }
     }
 
-    // create a new AM
-    String amType = jsonJob.get("am.type").toString();
-    runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
-        containerList, null);
+    return containers;
   }
 
   /**
@@ -463,7 +511,7 @@ public class SLSRunner extends Configured implements Tool {
           taskAttempt.getStartTime();
       containerList.add(
           new ContainerSimulator(getDefaultContainerResource(),
-              containerLifeTime, hostname, 10, "map"));
+              containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
     }
 
     // reducer
@@ -479,7 +527,7 @@ public class SLSRunner extends Configured implements Tool {
           taskAttempt.getStartTime();
       containerList.add(
           new ContainerSimulator(getDefaultContainerResource(),
-              containerLifeTime, hostname, 20, "reduce"));
+              containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
     }
 
     // Only supports the default job type currently
@@ -559,7 +607,7 @@ public class SLSRunner extends Configured implements Tool {
               Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
                   (int) tai.getTaskInfo().getTaskVCores());
           containerList.add(new ContainerSimulator(containerResource,
-              containerLifeTime, hostname, 10, "map"));
+              containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
           maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource);
           maxMapDur =
               containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur;
@@ -579,7 +627,7 @@ public class SLSRunner extends Configured implements Tool {
               Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
                   (int) tai.getTaskInfo().getTaskVCores());
           containerList.add(new ContainerSimulator(containerResource,
-              containerLifeTime, hostname, 20, "reduce"));
+              containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
           maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource);
           maxRedDur =
               containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 45a3c07..70c5579 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -400,26 +400,28 @@ public abstract class AMSimulator extends TaskRunner.Task {
     Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();
     ResourceRequest anyRequest = null;
     for (ContainerSimulator cs : csList) {
-      String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname());
-      // check rack local
-      String rackname = "/" + rackHostNames[0];
-      if (rackLocalRequestMap.containsKey(rackname)) {
-        rackLocalRequestMap.get(rackname).setNumContainers(
-            rackLocalRequestMap.get(rackname).getNumContainers() + 1);
-      } else {
-        ResourceRequest request = createResourceRequest(
-                cs.getResource(), rackname, priority, 1);
-        rackLocalRequestMap.put(rackname, request);
-      }
-      // check node local
-      String hostname = rackHostNames[1];
-      if (nodeLocalRequestMap.containsKey(hostname)) {
-        nodeLocalRequestMap.get(hostname).setNumContainers(
-            nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
-      } else {
-        ResourceRequest request = createResourceRequest(
-                cs.getResource(), hostname, priority, 1);
-        nodeLocalRequestMap.put(hostname, request);
+      if (cs.getHostname() != null) {
+        String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());
+        // check rack local
+        String rackname = "/" + rackHostNames[0];
+        if (rackLocalRequestMap.containsKey(rackname)) {
+          rackLocalRequestMap.get(rackname).setNumContainers(
+              rackLocalRequestMap.get(rackname).getNumContainers() + 1);
+        } else {
+          ResourceRequest request =
+              createResourceRequest(cs.getResource(), rackname, priority, 1);
+          rackLocalRequestMap.put(rackname, request);
+        }
+        // check node local
+        String hostname = rackHostNames[1];
+        if (nodeLocalRequestMap.containsKey(hostname)) {
+          nodeLocalRequestMap.get(hostname).setNumContainers(
+              nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
+        } else {
+          ResourceRequest request =
+              createResourceRequest(cs.getResource(), hostname, priority, 1);
+          nodeLocalRequestMap.put(hostname, request);
+        }
       }
       // any
       if (anyRequest == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
index 3d2ec94..c89e4e2 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
@@ -131,7 +131,7 @@ public class SynthTraceJobProducer implements JobStoryProducer {
   }
 
   public int getNodesPerRack() {
-    return trace.nodes_per_rack;
+    return trace.nodes_per_rack < 1 ? 1: trace.nodes_per_rack;
   }
 
   public int getNumNodes() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
index e27b36f..dbc2dab 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
@@ -101,7 +101,7 @@ public class SLSUtils {
    */
   public static Set<String> parseNodesFromSLSTrace(String jobTrace)
           throws IOException {
-    Set<String> nodeSet = new HashSet<String>();
+    Set<String> nodeSet = new HashSet<>();
     JsonFactory jsonF = new JsonFactory();
     ObjectMapper mapper = new ObjectMapper();
     Reader input =
@@ -109,13 +109,7 @@ public class SLSUtils {
     try {
       Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class);
       while (i.hasNext()) {
-        Map jsonE = i.next();
-        List tasks = (List) jsonE.get("job.tasks");
-        for (Object o : tasks) {
-          Map jsonTask = (Map) o;
-          String hostname = jsonTask.get("container.host").toString();
-          nodeSet.add(hostname);
-        }
+        addNodes(nodeSet, i.next());
       }
     } finally {
       input.close();
@@ -123,6 +117,29 @@ public class SLSUtils {
     return nodeSet;
   }
 
+  private static void addNodes(Set<String> nodeSet, Map jsonEntry) {
+    if (jsonEntry.containsKey("num.nodes")) {
+      int numNodes = Integer.parseInt(jsonEntry.get("num.nodes").toString());
+      int numRacks = 1;
+      if (jsonEntry.containsKey("num.racks")) {
+        numRacks = Integer.parseInt(
+            jsonEntry.get("num.racks").toString());
+      }
+      nodeSet.addAll(generateNodes(numNodes, numRacks));
+    }
+
+    if (jsonEntry.containsKey("job.tasks")) {
+      List tasks = (List) jsonEntry.get("job.tasks");
+      for (Object o : tasks) {
+        Map jsonTask = (Map) o;
+        String hostname = (String) jsonTask.get("container.host");
+        if (hostname != null) {
+          nodeSet.add(hostname);
+        }
+      }
+    }
+  }
+
   /**
    * parse the input node file, return each host name
    */
@@ -150,11 +167,19 @@ public class SLSUtils {
     return nodeSet;
   }
 
-  public static Set<? extends String> generateNodesFromSynth(
-      int numNodes, int nodesPerRack) {
-    Set<String> nodeSet = new HashSet<String>();
+  public static Set<? extends String> generateNodes(int numNodes,
+      int numRacks){
+    Set<String> nodeSet = new HashSet<>();
+    if (numRacks < 1) {
+      numRacks = 1;
+    }
+
+    if (numRacks > numNodes) {
+      numRacks = numNodes;
+    }
+
     for (int i = 0; i < numNodes; i++) {
-      nodeSet.add("/rack" + i % nodesPerRack + "/node" + i);
+      nodeSet.add("/rack" + i % numRacks + "/node" + i);
     }
     return nodeSet;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md
index f0e3b8c..6e00e9a 100644
--- a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md
+++ b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md
@@ -328,18 +328,24 @@ Appendix
 Here we provide an example format of the sls json file, which contains 2 jobs. The first job has 3 map tasks and the second one has 2 map tasks.
 
     {
-      "am.type" : "mapreduce",
-      "job.start.ms" : 0,
-      "job.end.ms" : 95375,
-      "job.queue.name" : "sls_queue_1",
-      "job.id" : "job_1",
-      "job.user" : "default",
+      "num.nodes": 3,  // total number of nodes in the cluster
+      "num.racks": 1   // total number of racks in the cluster, it divides num.nodes into the racks evenly, optional, the default value is 1
+    }
+    {
+      "am.type" : "mapreduce", // type of AM, optional, the default value is "mapreduce"
+      "job.start.ms" : 0,      // job start time
+      "job.end.ms" : 95375,    // job finish time, optional, the default value is 0
+      "job.queue.name" : "sls_queue_1", // the queue job will be submitted to
+      "job.id" : "job_1",      // the job id used to track the job, optional, the default value is an zero-based integer increasing with number of jobs
+      "job.user" : "default",  // user, optional, the default value is "default"
       "job.tasks" : [ {
-        "container.host" : "/default-rack/node1",
-        "container.start.ms" : 6664,
-        "container.end.ms" : 23707,
-        "container.priority" : 20,
-        "container.type" : "map"
+        "count": 1,    // number of tasks, optional, the default value is 1
+        "container.host" : "/default-rack/node1",  // host the container asks for
+        "container.start.ms" : 6664,  // container start time, optional
+        "container.end.ms" : 23707,   // container finish time, optional
+        "duration.ms":  50000,        // duration of the container, optional if start and end time is specified
+        "container.priority" : 20,    // priority of the container, optional, the default value is 20
+        "container.type" : "map"      // type of the container, could be "map" or "reduce", optional, the default value is "map"
       }, {
         "container.host" : "/default-rack/node3",
         "container.start.ms" : 6665,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
index f4eda67..30964a1 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.sls.utils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class TestSLSUtils {
 
   @Test
@@ -36,4 +39,31 @@ public class TestSLSUtils {
     Assert.assertEquals(rackHostname[1], "node1");
   }
 
+  @Test
+  public void testGenerateNodes() {
+    Set<? extends String> nodes = SLSUtils.generateNodes(3, 3);
+    Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
+    Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes));
+
+    nodes = SLSUtils.generateNodes(3, 1);
+    Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
+    Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes));
+
+    nodes = SLSUtils.generateNodes(3, 4);
+    Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
+    Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes));
+
+    nodes = SLSUtils.generateNodes(3, 0);
+    Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
+    Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes));
+  }
+
+  private int getNumRack(Set<? extends String> nodes) {
+    Set<String> racks = new HashSet<>();
+    for (String node : nodes) {
+      String[] rackHostname = SLSUtils.getRackHostName(node);
+      racks.add(rackHostname[0]);
+    }
+    return racks.size();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org