You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/05/09 05:47:17 UTC
[34/50] [abbrv] hadoop git commit: YARN-6522. Make SLS JSON input
file format simple and scalable (yufeigu via rkanter)
YARN-6522. Make SLS JSON input file format simple and scalable (yufeigu via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3082552b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3082552b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3082552b
Branch: refs/heads/HDFS-7240
Commit: 3082552b3b991df846caf572b58e44308ddf8eeb
Parents: 07761af
Author: Robert Kanter <rk...@apache.org>
Authored: Thu May 4 17:21:46 2017 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Thu May 4 17:21:46 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/yarn/sls/SLSRunner.java | 102 ++++++++++++++-----
.../hadoop/yarn/sls/appmaster/AMSimulator.java | 42 ++++----
.../sls/synthetic/SynthTraceJobProducer.java | 2 +-
.../apache/hadoop/yarn/sls/utils/SLSUtils.java | 49 ++++++---
.../src/site/markdown/SchedulerLoadSimulator.md | 28 +++--
.../hadoop/yarn/sls/utils/TestSLSUtils.java | 30 ++++++
6 files changed, 182 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 9d35d1b..ddd35ef 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -119,6 +119,9 @@ public class SLSRunner extends Configured implements Tool {
// logger
public final static Logger LOG = Logger.getLogger(SLSRunner.class);
+ private final static int DEFAULT_MAPPER_PRIORITY = 20;
+ private final static int DEFAULT_REDUCER_PRIORITY = 10;
+
/**
* The type of trace in input.
*/
@@ -247,8 +250,8 @@ public class SLSRunner extends Configured implements Tool {
break;
case SYNTH:
stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
- nodeSet.addAll(SLSUtils.generateNodesFromSynth(stjp.getNumNodes(),
- stjp.getNodesPerRack()));
+ nodeSet.addAll(SLSUtils.generateNodes(stjp.getNumNodes(),
+ stjp.getNumNodes()/stjp.getNodesPerRack()));
break;
default:
throw new YarnException("Input configuration not recognized, "
@@ -259,6 +262,10 @@ public class SLSRunner extends Configured implements Tool {
nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile));
}
+ if (nodeSet.size() == 0) {
+ throw new YarnException("No node! Please configure nodes.");
+ }
+
// create NM simulators
Random random = new Random();
Set<String> rackSet = new HashSet<String>();
@@ -348,7 +355,11 @@ public class SLSRunner extends Configured implements Tool {
private void createAMForJob(Map jsonJob) throws YarnException {
long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString());
- long jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
+
+ long jobFinishTime = 0;
+ if (jsonJob.containsKey("job.end.ms")) {
+ jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
+ }
String user = (String) jsonJob.get("job.user");
if (user == null) {
@@ -358,25 +369,49 @@ public class SLSRunner extends Configured implements Tool {
String queue = jsonJob.get("job.queue.name").toString();
increaseQueueAppNum(queue);
- String oldAppId = jsonJob.get("job.id").toString();
+ String oldAppId = (String)jsonJob.get("job.id");
+ if (oldAppId == null) {
+ oldAppId = Integer.toString(AM_ID);
+ }
- // tasks
+ String amType = (String)jsonJob.get("am.type");
+ if (amType == null) {
+ amType = SLSUtils.DEFAULT_JOB_TYPE;
+ }
+
+ runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
+ getTaskContainers(jsonJob), null);
+ }
+
+ private List<ContainerSimulator> getTaskContainers(Map jsonJob)
+ throws YarnException {
+ List<ContainerSimulator> containers = new ArrayList<>();
List tasks = (List) jsonJob.get("job.tasks");
if (tasks == null || tasks.size() == 0) {
throw new YarnException("No task for the job!");
}
- List<ContainerSimulator> containerList = new ArrayList<>();
for (Object o : tasks) {
Map jsonTask = (Map) o;
- String hostname = jsonTask.get("container.host").toString();
- long taskStart = Long.parseLong(jsonTask.get("container.start.ms")
- .toString());
- long taskFinish = Long.parseLong(jsonTask.get("container.end.ms")
- .toString());
- long lifeTime = taskFinish - taskStart;
-
- // Set memory and vcores from job trace file
+
+ String hostname = (String) jsonTask.get("container.host");
+
+ long duration = 0;
+ if (jsonTask.containsKey("duration.ms")) {
+ duration = Integer.parseInt(jsonTask.get("duration.ms").toString());
+ } else if (jsonTask.containsKey("container.start.ms") &&
+ jsonTask.containsKey("container.end.ms")) {
+ long taskStart = Long.parseLong(jsonTask.get("container.start.ms")
+ .toString());
+ long taskFinish = Long.parseLong(jsonTask.get("container.end.ms")
+ .toString());
+ duration = taskFinish - taskStart;
+ }
+ if (duration <= 0) {
+ throw new YarnException("Duration of a task shouldn't be less or equal"
+ + " to 0!");
+ }
+
Resource res = getDefaultContainerResource();
if (jsonTask.containsKey("container.memory")) {
int containerMemory =
@@ -390,17 +425,30 @@ public class SLSRunner extends Configured implements Tool {
res.setVirtualCores(containerVCores);
}
- int priority = Integer.parseInt(jsonTask.get("container.priority")
- .toString());
- String type = jsonTask.get("container.type").toString();
- containerList.add(
- new ContainerSimulator(res, lifeTime, hostname, priority, type));
+ int priority = DEFAULT_MAPPER_PRIORITY;
+ if (jsonTask.containsKey("container.priority")) {
+ priority = Integer.parseInt(jsonTask.get("container.priority")
+ .toString());
+ }
+
+ String type = "map";
+ if (jsonTask.containsKey("container.type")) {
+ type = jsonTask.get("container.type").toString();
+ }
+
+ int count = 1;
+ if (jsonTask.containsKey("count")) {
+ count = Integer.parseInt(jsonTask.get("count").toString());
+ }
+ count = Math.max(count, 1);
+
+ for (int i = 0; i < count; i++) {
+ containers.add(
+ new ContainerSimulator(res, duration, hostname, priority, type));
+ }
}
- // create a new AM
- String amType = jsonJob.get("am.type").toString();
- runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
- containerList, null);
+ return containers;
}
/**
@@ -463,7 +511,7 @@ public class SLSRunner extends Configured implements Tool {
taskAttempt.getStartTime();
containerList.add(
new ContainerSimulator(getDefaultContainerResource(),
- containerLifeTime, hostname, 10, "map"));
+ containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
}
// reducer
@@ -479,7 +527,7 @@ public class SLSRunner extends Configured implements Tool {
taskAttempt.getStartTime();
containerList.add(
new ContainerSimulator(getDefaultContainerResource(),
- containerLifeTime, hostname, 20, "reduce"));
+ containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
}
// Only supports the default job type currently
@@ -559,7 +607,7 @@ public class SLSRunner extends Configured implements Tool {
Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
(int) tai.getTaskInfo().getTaskVCores());
containerList.add(new ContainerSimulator(containerResource,
- containerLifeTime, hostname, 10, "map"));
+ containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource);
maxMapDur =
containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur;
@@ -579,7 +627,7 @@ public class SLSRunner extends Configured implements Tool {
Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
(int) tai.getTaskInfo().getTaskVCores());
containerList.add(new ContainerSimulator(containerResource,
- containerLifeTime, hostname, 20, "reduce"));
+ containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource);
maxRedDur =
containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 45a3c07..70c5579 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -400,26 +400,28 @@ public abstract class AMSimulator extends TaskRunner.Task {
Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();
ResourceRequest anyRequest = null;
for (ContainerSimulator cs : csList) {
- String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname());
- // check rack local
- String rackname = "/" + rackHostNames[0];
- if (rackLocalRequestMap.containsKey(rackname)) {
- rackLocalRequestMap.get(rackname).setNumContainers(
- rackLocalRequestMap.get(rackname).getNumContainers() + 1);
- } else {
- ResourceRequest request = createResourceRequest(
- cs.getResource(), rackname, priority, 1);
- rackLocalRequestMap.put(rackname, request);
- }
- // check node local
- String hostname = rackHostNames[1];
- if (nodeLocalRequestMap.containsKey(hostname)) {
- nodeLocalRequestMap.get(hostname).setNumContainers(
- nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
- } else {
- ResourceRequest request = createResourceRequest(
- cs.getResource(), hostname, priority, 1);
- nodeLocalRequestMap.put(hostname, request);
+ if (cs.getHostname() != null) {
+ String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());
+ // check rack local
+ String rackname = "/" + rackHostNames[0];
+ if (rackLocalRequestMap.containsKey(rackname)) {
+ rackLocalRequestMap.get(rackname).setNumContainers(
+ rackLocalRequestMap.get(rackname).getNumContainers() + 1);
+ } else {
+ ResourceRequest request =
+ createResourceRequest(cs.getResource(), rackname, priority, 1);
+ rackLocalRequestMap.put(rackname, request);
+ }
+ // check node local
+ String hostname = rackHostNames[1];
+ if (nodeLocalRequestMap.containsKey(hostname)) {
+ nodeLocalRequestMap.get(hostname).setNumContainers(
+ nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
+ } else {
+ ResourceRequest request =
+ createResourceRequest(cs.getResource(), hostname, priority, 1);
+ nodeLocalRequestMap.put(hostname, request);
+ }
}
// any
if (anyRequest == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
index 3d2ec94..c89e4e2 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
@@ -131,7 +131,7 @@ public class SynthTraceJobProducer implements JobStoryProducer {
}
public int getNodesPerRack() {
- return trace.nodes_per_rack;
+ return trace.nodes_per_rack < 1 ? 1: trace.nodes_per_rack;
}
public int getNumNodes() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
index e27b36f..dbc2dab 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
@@ -101,7 +101,7 @@ public class SLSUtils {
*/
public static Set<String> parseNodesFromSLSTrace(String jobTrace)
throws IOException {
- Set<String> nodeSet = new HashSet<String>();
+ Set<String> nodeSet = new HashSet<>();
JsonFactory jsonF = new JsonFactory();
ObjectMapper mapper = new ObjectMapper();
Reader input =
@@ -109,13 +109,7 @@ public class SLSUtils {
try {
Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class);
while (i.hasNext()) {
- Map jsonE = i.next();
- List tasks = (List) jsonE.get("job.tasks");
- for (Object o : tasks) {
- Map jsonTask = (Map) o;
- String hostname = jsonTask.get("container.host").toString();
- nodeSet.add(hostname);
- }
+ addNodes(nodeSet, i.next());
}
} finally {
input.close();
@@ -123,6 +117,29 @@ public class SLSUtils {
return nodeSet;
}
+ private static void addNodes(Set<String> nodeSet, Map jsonEntry) {
+ if (jsonEntry.containsKey("num.nodes")) {
+ int numNodes = Integer.parseInt(jsonEntry.get("num.nodes").toString());
+ int numRacks = 1;
+ if (jsonEntry.containsKey("num.racks")) {
+ numRacks = Integer.parseInt(
+ jsonEntry.get("num.racks").toString());
+ }
+ nodeSet.addAll(generateNodes(numNodes, numRacks));
+ }
+
+ if (jsonEntry.containsKey("job.tasks")) {
+ List tasks = (List) jsonEntry.get("job.tasks");
+ for (Object o : tasks) {
+ Map jsonTask = (Map) o;
+ String hostname = (String) jsonTask.get("container.host");
+ if (hostname != null) {
+ nodeSet.add(hostname);
+ }
+ }
+ }
+ }
+
/**
* parse the input node file, return each host name
*/
@@ -150,11 +167,19 @@ public class SLSUtils {
return nodeSet;
}
- public static Set<? extends String> generateNodesFromSynth(
- int numNodes, int nodesPerRack) {
- Set<String> nodeSet = new HashSet<String>();
+ public static Set<? extends String> generateNodes(int numNodes,
+ int numRacks){
+ Set<String> nodeSet = new HashSet<>();
+ if (numRacks < 1) {
+ numRacks = 1;
+ }
+
+ if (numRacks > numNodes) {
+ numRacks = numNodes;
+ }
+
for (int i = 0; i < numNodes; i++) {
- nodeSet.add("/rack" + i % nodesPerRack + "/node" + i);
+ nodeSet.add("/rack" + i % numRacks + "/node" + i);
}
return nodeSet;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md
index f0e3b8c..6e00e9a 100644
--- a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md
+++ b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md
@@ -328,18 +328,24 @@ Appendix
Here we provide an example format of the sls json file, which contains 2 jobs. The first job has 3 map tasks and the second one has 2 map tasks.
{
- "am.type" : "mapreduce",
- "job.start.ms" : 0,
- "job.end.ms" : 95375,
- "job.queue.name" : "sls_queue_1",
- "job.id" : "job_1",
- "job.user" : "default",
+ "num.nodes": 3, // total number of nodes in the cluster
+ "num.racks": 1 // total number of racks in the cluster, it divides num.nodes into the racks evenly, optional, the default value is 1
+ }
+ {
+ "am.type" : "mapreduce", // type of AM, optional, the default value is "mapreduce"
+ "job.start.ms" : 0, // job start time
+ "job.end.ms" : 95375, // job finish time, optional, the default value is 0
+ "job.queue.name" : "sls_queue_1", // the queue job will be submitted to
+ "job.id" : "job_1", // the job id used to track the job, optional, the default value is an zero-based integer increasing with number of jobs
+ "job.user" : "default", // user, optional, the default value is "default"
"job.tasks" : [ {
- "container.host" : "/default-rack/node1",
- "container.start.ms" : 6664,
- "container.end.ms" : 23707,
- "container.priority" : 20,
- "container.type" : "map"
+ "count": 1, // number of tasks, optional, the default value is 1
+ "container.host" : "/default-rack/node1", // host the container asks for
+ "container.start.ms" : 6664, // container start time, optional
+ "container.end.ms" : 23707, // container finish time, optional
+ "duration.ms": 50000, // duration of the container, optional if start and end time is specified
+ "container.priority" : 20, // priority of the container, optional, the default value is 20
+ "container.type" : "map" // type of the container, could be "map" or "reduce", optional, the default value is "map"
}, {
"container.host" : "/default-rack/node3",
"container.start.ms" : 6665,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3082552b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
index f4eda67..30964a1 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.sls.utils;
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashSet;
+import java.util.Set;
+
public class TestSLSUtils {
@Test
@@ -36,4 +39,31 @@ public class TestSLSUtils {
Assert.assertEquals(rackHostname[1], "node1");
}
+ @Test
+ public void testGenerateNodes() {
+ Set<? extends String> nodes = SLSUtils.generateNodes(3, 3);
+ Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
+ Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes));
+
+ nodes = SLSUtils.generateNodes(3, 1);
+ Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
+ Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes));
+
+ nodes = SLSUtils.generateNodes(3, 4);
+ Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
+ Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes));
+
+ nodes = SLSUtils.generateNodes(3, 0);
+ Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
+ Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes));
+ }
+
+ private int getNumRack(Set<? extends String> nodes) {
+ Set<String> racks = new HashSet<>();
+ for (String node : nodes) {
+ String[] rackHostname = SLSUtils.getRackHostName(node);
+ racks.add(rackHostname[0]);
+ }
+ return racks.size();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org