You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by qu...@apache.org on 2022/03/24 05:17:33 UTC

[hadoop] branch trunk updated: YARN-10547. Decouple job parsing logic from SLSRunner. Contributed by Szilard Nemeth.

This is an automated email from the ASF dual-hosted git repository.

quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 077c6c6  YARN-10547. Decouple job parsing logic from SLSRunner. Contributed by Szilard Nemeth.
077c6c6 is described below

commit 077c6c62d6c1ed89e209449a5f9c5849b05e7dff
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Thu Mar 24 06:16:13 2022 +0100

    YARN-10547. Decouple job parsing logic from SLSRunner. Contributed by Szilard Nemeth.
---
 .../org/apache/hadoop/yarn/sls/AMDefinition.java   | 105 ++++++
 .../hadoop/yarn/sls/AMDefinitionFactory.java       | 133 ++++++++
 .../apache/hadoop/yarn/sls/AMDefinitionRumen.java  | 167 +++++++++
 .../apache/hadoop/yarn/sls/AMDefinitionSLS.java    | 186 ++++++++++
 .../apache/hadoop/yarn/sls/AMDefinitionSynth.java  | 146 ++++++++
 .../org/apache/hadoop/yarn/sls/JobDefinition.java  |  87 +++++
 .../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 376 +++------------------
 .../hadoop/yarn/sls/TaskContainerDefinition.java   | 248 ++++++++++++++
 .../hadoop/yarn/sls/appmaster/AMSimulator.java     |  39 ++-
 .../hadoop/yarn/sls/appmaster/DAGAMSimulator.java  |  20 +-
 .../hadoop/yarn/sls/appmaster/MRAMSimulator.java   |  17 +-
 .../yarn/sls/appmaster/StreamAMSimulator.java      |  23 +-
 .../yarn/sls/scheduler/ContainerSimulator.java     |  37 +-
 .../org/apache/hadoop/yarn/sls/utils/SLSUtils.java |   2 -
 .../apache/hadoop/yarn/sls/TestDagAMSimulator.java |  16 +-
 .../hadoop/yarn/sls/appmaster/TestAMSimulator.java |  95 ++++--
 16 files changed, 1272 insertions(+), 425 deletions(-)

diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java
new file mode 100644
index 0000000..1f9e351
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+
+import java.util.List;
+
+public abstract class AMDefinition {
+  protected int jobCount;
+  protected String amType;
+  protected String user;
+  protected String queue;
+  protected long jobStartTime;
+  protected long jobFinishTime;
+  protected List<ContainerSimulator> taskContainers;
+  protected Resource amResource;
+  protected String labelExpression;
+  protected String oldAppId;
+
+  public AMDefinition(AmDefinitionBuilder builder) {
+    this.jobStartTime = builder.jobStartTime;
+    this.jobFinishTime = builder.jobFinishTime;
+    this.amType = builder.amType;
+    this.taskContainers = builder.taskContainers;
+    this.labelExpression = builder.labelExpression;
+    this.user = builder.user;
+    this.amResource = builder.amResource;
+    this.queue = builder.queue;
+    this.jobCount = builder.jobCount;
+    this.oldAppId = builder.jobId;
+  }
+
+  public String getAmType() {
+    return amType;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getOldAppId() {
+    return oldAppId;
+  }
+
+  public long getJobStartTime() {
+    return jobStartTime;
+  }
+
+  public long getJobFinishTime() {
+    return jobFinishTime;
+  }
+
+  public List<ContainerSimulator> getTaskContainers() {
+    return taskContainers;
+  }
+
+  public Resource getAmResource() {
+    return amResource;
+  }
+
+  public String getLabelExpression() {
+    return labelExpression;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public int getJobCount() {
+    return jobCount;
+  }
+
+
+  public abstract static class AmDefinitionBuilder {
+    private static final String DEFAULT_USER = "default";
+
+    protected int jobCount = 1;
+    protected String amType = AMDefinitionFactory.DEFAULT_JOB_TYPE;
+    protected String user = DEFAULT_USER;
+    protected String queue;
+    protected String jobId;
+    protected long jobStartTime;
+    protected long jobFinishTime;
+    protected List<ContainerSimulator> taskContainers;
+    protected Resource amResource;
+    protected String labelExpression = null;
+
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
new file mode 100644
index 0000000..2bbe7bb
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public final class AMDefinitionFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AMDefinitionFactory.class);
+  public final static String DEFAULT_JOB_TYPE = "mapreduce";
+
+  private AMDefinitionFactory() {}
+
+  public static AMDefinitionSLS createFromSlsTrace(Map<?, ?> jsonJob,
+      SLSRunner slsRunner) throws YarnException {
+    AMDefinitionSLS amDefinition = AMDefinitionSLS.Builder.create(jsonJob)
+        .withAmType(SLSConfiguration.AM_TYPE)
+        .withAmResource(getAMContainerResourceSLS(jsonJob, slsRunner))
+        .withTaskContainers(
+            AMDefinitionSLS.getTaskContainers(jsonJob, slsRunner))
+        .withJobStartTime(SLSConfiguration.JOB_START_MS)
+        .withJobFinishTime(SLSConfiguration.JOB_END_MS)
+        .withLabelExpression(SLSConfiguration.JOB_LABEL_EXPR)
+        .withUser(SLSConfiguration.JOB_USER)
+        .withQueue(SLSConfiguration.JOB_QUEUE_NAME)
+        .withJobId(SLSConfiguration.JOB_ID)
+        .withJobCount(SLSConfiguration.JOB_COUNT)
+        .build();
+    slsRunner.increaseQueueAppNum(amDefinition.getQueue());
+    return amDefinition;
+  }
+
+  public static AMDefinitionRumen createFromRumenTrace(LoggedJob job,
+      long baselineTimeMs, SLSRunner slsRunner) throws YarnException {
+    AMDefinitionRumen amDefinition = AMDefinitionRumen.Builder.create()
+        .withAmType(DEFAULT_JOB_TYPE)
+        .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner))
+        .withTaskContainers(
+            AMDefinitionRumen.getTaskContainers(job, slsRunner))
+        .withJobStartTime(job.getSubmitTime())
+        .withJobFinishTime(job.getFinishTime())
+        .withBaseLineTimeMs(baselineTimeMs)
+        .withUser(job.getUser())
+        .withQueue(job.getQueue().getValue())
+        .withJobId(job.getJobID().toString())
+        .build();
+    slsRunner.increaseQueueAppNum(amDefinition.getQueue());
+    return amDefinition;
+  }
+
+  public static AMDefinitionSynth createFromSynth(SynthJob job,
+      SLSRunner slsRunner) throws YarnException {
+    AMDefinitionSynth amDefinition =
+        AMDefinitionSynth.Builder.create()
+            .withAmType(job.getType())
+            .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner))
+            .withTaskContainers(
+                AMDefinitionSynth.getTaskContainers(job, slsRunner))
+            .withUser(job.getUser())
+            .withQueue(job.getQueueName())
+            .withJobId(job.getJobID().toString())
+            .withJobStartTime(job.getSubmissionTime())
+            .withJobFinishTime(job.getSubmissionTime() + job.getDuration())
+            .withBaseLineTimeMs(0)
+            .build();
+
+    slsRunner.increaseQueueAppNum(amDefinition.getQueue());
+    return amDefinition;
+  }
+
+  private static Resource getAMContainerResourceSLS(Map<?, ?> jsonJob,
+      Configured configured) {
+    Resource amContainerResource =
+        SLSConfiguration.getAMContainerResource(configured.getConf());
+    if (jsonJob == null) {
+      return amContainerResource;
+    }
+
+    ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
+    for (ResourceInformation info : infors) {
+      String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
+      if (jsonJob.containsKey(key)) {
+        long value = Long.parseLong(jsonJob.get(key).toString());
+        amContainerResource.setResourceValue(info.getName(), value);
+      }
+    }
+
+    return amContainerResource;
+  }
+
+  private static Resource getAMContainerResourceSynthAndRumen(
+      Configured configured) {
+    return SLSConfiguration.getAMContainerResource(configured.getConf());
+  }
+
+  static void adjustTimeValuesToBaselineTime(AMDefinition amDef,
+      AMDefinition.AmDefinitionBuilder builder, long baselineTimeMs) {
+    builder.jobStartTime -= baselineTimeMs;
+    builder.jobFinishTime -= baselineTimeMs;
+    if (builder.jobStartTime < 0) {
+      LOG.warn("Warning: reset job {} start time to 0.", amDef.getOldAppId());
+      builder.jobFinishTime = builder.jobFinishTime - builder.jobStartTime;
+      builder.jobStartTime = 0;
+    }
+    amDef.jobStartTime = builder.jobStartTime;
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java
new file mode 100644
index 0000000..cc97a90
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.tools.rumen.LoggedTask;
+import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
+import org.apache.hadoop.tools.rumen.datatypes.UserName;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime;
+
+public class AMDefinitionRumen extends AMDefinition {
+  public final static int DEFAULT_MAPPER_PRIORITY = 20;
+  private final static int DEFAULT_REDUCER_PRIORITY = 10;
+
+  public AMDefinitionRumen(AmDefinitionBuilder builder) {
+    super(builder);
+  }
+
+  public static List<ContainerSimulator> getTaskContainers(LoggedJob job,
+      SLSRunner slsRunner) throws YarnException {
+    List<ContainerSimulator> containerList = new ArrayList<>();
+
+    TaskContainerDefinition.Builder builder =
+        TaskContainerDefinition.Builder.create()
+            .withCount(1)
+            .withResource(slsRunner.getDefaultContainerResource())
+            .withExecutionType(ExecutionType.GUARANTEED)
+            .withAllocationId(-1)
+            .withRequestDelay(0);
+
+    // mapper
+    for (LoggedTask mapTask : job.getMapTasks()) {
+      if (mapTask.getAttempts().size() == 0) {
+        throw new YarnException("Invalid map task, no attempt for a mapper!");
+      }
+      LoggedTaskAttempt taskAttempt =
+          mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
+      TaskContainerDefinition containerDef = builder
+          .withHostname(taskAttempt.getHostName().getValue())
+          .withDuration(taskAttempt.getFinishTime() -
+              taskAttempt.getStartTime())
+          .withPriority(DEFAULT_MAPPER_PRIORITY)
+          .withType("map")
+          .build();
+      containerList.add(
+          ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+    }
+
+    // reducer
+    for (LoggedTask reduceTask : job.getReduceTasks()) {
+      if (reduceTask.getAttempts().size() == 0) {
+        throw new YarnException(
+            "Invalid reduce task, no attempt for a reducer!");
+      }
+      LoggedTaskAttempt taskAttempt =
+          reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
+      TaskContainerDefinition containerDef = builder
+          .withHostname(taskAttempt.getHostName().getValue())
+          .withDuration(taskAttempt.getFinishTime() -
+              taskAttempt.getStartTime())
+          .withPriority(DEFAULT_REDUCER_PRIORITY)
+          .withType("reduce")
+          .build();
+      containerList.add(
+          ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+    }
+
+    return containerList;
+  }
+
+
+  public static final class Builder extends AmDefinitionBuilder {
+    private long baselineTimeMs;
+
+    private Builder() {
+    }
+
+    public static Builder create() {
+      return new Builder();
+    }
+
+    public Builder withAmType(String amType) {
+      this.amType = amType;
+      return this;
+    }
+
+    public Builder withUser(UserName user) {
+      if (user != null) {
+        this.user = user.getValue();
+      }
+      return this;
+    }
+
+    public Builder withQueue(String queue) {
+      this.queue = queue;
+      return this;
+    }
+
+    public Builder withJobId(String oldJobId) {
+      this.jobId = oldJobId;
+      return this;
+    }
+
+    public Builder withJobStartTime(long time) {
+      this.jobStartTime = time;
+      return this;
+    }
+
+    public Builder withJobFinishTime(long time) {
+      this.jobFinishTime = time;
+      return this;
+    }
+
+    public Builder withBaseLineTimeMs(long baselineTimeMs) {
+      this.baselineTimeMs = baselineTimeMs;
+      return this;
+    }
+
+    public Builder withLabelExpression(String expr) {
+      this.labelExpression = expr;
+      return this;
+    }
+
+    public AMDefinitionRumen.Builder withTaskContainers(
+        List<ContainerSimulator> taskContainers) {
+      this.taskContainers = taskContainers;
+      return this;
+    }
+
+    public AMDefinitionRumen.Builder withAmResource(Resource amResource) {
+      this.amResource = amResource;
+      return this;
+    }
+
+    public AMDefinitionRumen build() {
+      AMDefinitionRumen amDef = new AMDefinitionRumen(this);
+
+      if (baselineTimeMs == 0) {
+        baselineTimeMs = jobStartTime;
+      }
+      adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs);
+      return amDef;
+    }
+
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
new file mode 100644
index 0000000..7439ddf
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class AMDefinitionSLS extends AMDefinition {
+  public AMDefinitionSLS(AmDefinitionBuilder builder) {
+    super(builder);
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public static List<ContainerSimulator> getTaskContainers(Map<?, ?> jsonJob,
+      SLSRunner slsRunner) throws YarnException {
+    List<Map<?, ?>> tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
+    if (tasks == null || tasks.size() == 0) {
+      throw new YarnException("No task for the job!");
+    }
+
+    List<ContainerSimulator> containers = new ArrayList<>();
+    for (Map<?, ?> jsonTask : tasks) {
+      TaskContainerDefinition containerDef =
+          TaskContainerDefinition.Builder.create()
+              .withCount(jsonTask, SLSConfiguration.COUNT)
+              .withHostname((String) jsonTask.get(SLSConfiguration.TASK_HOST))
+              .withDuration(jsonTask, SLSConfiguration.TASK_DURATION_MS)
+              .withDurationLegacy(jsonTask, SLSConfiguration.DURATION_MS)
+              .withTaskStart(jsonTask, SLSConfiguration.TASK_START_MS)
+              .withTaskFinish(jsonTask, SLSConfiguration.TASK_END_MS)
+              .withResource(getResourceForContainer(jsonTask, slsRunner))
+              .withPriority(jsonTask, SLSConfiguration.TASK_PRIORITY)
+              .withType(jsonTask, SLSConfiguration.TASK_TYPE)
+              .withExecutionType(jsonTask, SLSConfiguration.TASK_EXECUTION_TYPE)
+              .withAllocationId(jsonTask, SLSConfiguration.TASK_ALLOCATION_ID)
+              .withRequestDelay(jsonTask, SLSConfiguration.TASK_REQUEST_DELAY)
+              .build();
+
+      for (int i = 0; i < containerDef.getCount(); i++) {
+        containers.add(ContainerSimulator.
+            createFromTaskContainerDefinition(containerDef));
+      }
+    }
+    return containers;
+  }
+
+  private static Resource getResourceForContainer(Map<?, ?> jsonTask,
+      SLSRunner slsRunner) {
+    Resource res = slsRunner.getDefaultContainerResource();
+    ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
+    for (ResourceInformation info : infors) {
+      if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
+        long value = Long.parseLong(
+            jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
+                .toString());
+        res.setResourceValue(info.getName(), value);
+      }
+    }
+    return res;
+  }
+
+  public static final class Builder extends AmDefinitionBuilder {
+    private final Map<?, ?> jsonJob;
+
+    private Builder(Map<?, ?> jsonJob) {
+      this.jsonJob = jsonJob;
+    }
+
+    public static Builder create(Map<?, ?> jsonJob) {
+      return new Builder(jsonJob);
+    }
+
+    public Builder withAmType(String key) {
+      if (jsonJob.containsKey(key)) {
+        String amType = (String) jsonJob.get(key);
+        if (amType != null) {
+          this.amType = amType;
+        }
+      }
+      return this;
+    }
+
+    public Builder withUser(String key) {
+      if (jsonJob.containsKey(key)) {
+        String user = (String) jsonJob.get(key);
+        if (user != null) {
+          this.user = user;
+        }
+      }
+      return this;
+    }
+
+    public Builder withQueue(String key) {
+      if (jsonJob.containsKey(key)) {
+        this.queue = jsonJob.get(key).toString();
+      }
+      return this;
+    }
+
+    public Builder withJobId(String key) {
+      if (jsonJob.containsKey(key)) {
+        this.jobId = (String) jsonJob.get(key);
+      }
+      return this;
+    }
+
+    public Builder withJobCount(String key) {
+      if (jsonJob.containsKey(key)) {
+        jobCount = Integer.parseInt(jsonJob.get(key).toString());
+        jobCount = Math.max(jobCount, 1);
+      }
+      return this;
+    }
+
+    public Builder withJobStartTime(String key) {
+      if (jsonJob.containsKey(key)) {
+        this.jobStartTime = Long.parseLong(jsonJob.get(key).toString());
+      }
+      return this;
+    }
+
+    public Builder withJobFinishTime(String key) {
+      if (jsonJob.containsKey(key)) {
+        this.jobFinishTime = Long.parseLong(jsonJob.get(key).toString());
+      }
+      return this;
+    }
+
+    public Builder withLabelExpression(String key) {
+      if (jsonJob.containsKey(key)) {
+        this.labelExpression = jsonJob.get(key).toString();
+      }
+      return this;
+    }
+
+    public AMDefinitionSLS.Builder withTaskContainers(
+        List<ContainerSimulator> taskContainers) {
+      this.taskContainers = taskContainers;
+      return this;
+    }
+
+    public AMDefinitionSLS.Builder withAmResource(Resource amResource) {
+      this.amResource = amResource;
+      return this;
+    }
+
+    public AMDefinitionSLS build() {
+      AMDefinitionSLS amDef = new AMDefinitionSLS(this);
+      // Job id is generated automatically if this job configuration allows
+      // multiple job instances
+      if (jobCount > 1) {
+        amDef.oldAppId = null;
+      } else {
+        amDef.oldAppId = jobId;
+      }
+      amDef.jobCount = jobCount;
+      return amDef;
+    }
+  }
+
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java
new file mode 100644
index 0000000..db736f0
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+
+public class AMDefinitionSynth extends AMDefinition {
+  public AMDefinitionSynth(AmDefinitionBuilder builder) {
+    super(builder);
+  }
+
+  public static List<ContainerSimulator> getTaskContainers(
+      SynthJob job, SLSRunner slsRunner) throws YarnException {
+    List<ContainerSimulator> containerList = new ArrayList<>();
+    ArrayList<NodeId> keyAsArray = new ArrayList<>(
+        slsRunner.getNmMap().keySet());
+    Random rand = new Random(slsRunner.getStjp().getSeed());
+
+    for (SynthJob.SynthTask task : job.getTasks()) {
+      RMNode node = getRandomNode(slsRunner, keyAsArray, rand);
+      TaskContainerDefinition containerDef =
+          TaskContainerDefinition.Builder.create()
+              .withCount(1)
+              .withHostname("/" + node.getRackName() + "/" + node.getHostName())
+              .withDuration(task.getTime())
+              .withResource(Resource
+                  .newInstance((int) task.getMemory(), (int) task.getVcores()))
+              .withPriority(task.getPriority())
+              .withType(task.getType())
+              .withExecutionType(task.getExecutionType())
+              .withAllocationId(-1)
+              .withRequestDelay(0)
+              .build();
+      containerList.add(
+          ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+    }
+
+    return containerList;
+  }
+
+  private static RMNode getRandomNode(SLSRunner slsRunner,
+      ArrayList<NodeId> keyAsArray, Random rand) {
+    int randomIndex = rand.nextInt(keyAsArray.size());
+    return slsRunner.getNmMap().get(keyAsArray.get(randomIndex)).getNode();
+  }
+
+  public static final class Builder extends AmDefinitionBuilder {
+    private long baselineTimeMs;
+
+    private Builder() {
+    }
+
+    public static Builder create() {
+      return new Builder();
+    }
+
+    public Builder withAmType(String amType) {
+      this.amType = amType;
+      return this;
+    }
+
+    public Builder withUser(String user) {
+      if (user != null) {
+        this.user = user;
+      }
+      return this;
+    }
+
+    public Builder withQueue(String queue) {
+      this.queue = queue;
+      return this;
+    }
+
+    public Builder withJobId(String oldJobId) {
+      this.jobId = oldJobId;
+      return this;
+    }
+
+    public Builder withJobStartTime(long time) {
+      this.jobStartTime = time;
+      return this;
+    }
+
+    public Builder withJobFinishTime(long time) {
+      this.jobFinishTime = time;
+      return this;
+    }
+
+    public Builder withBaseLineTimeMs(long baselineTimeMs) {
+      this.baselineTimeMs = baselineTimeMs;
+      return this;
+    }
+
+    public AMDefinitionSynth.Builder withLabelExpression(String expr) {
+      this.labelExpression = expr;
+      return this;
+    }
+
+    public AMDefinitionSynth.Builder withTaskContainers(
+        List<ContainerSimulator> taskContainers) {
+      this.taskContainers = taskContainers;
+      return this;
+    }
+
+    public AMDefinitionSynth.Builder withAmResource(Resource amResource) {
+      this.amResource = amResource;
+      return this;
+    }
+
+    public AMDefinitionSynth build() {
+      AMDefinitionSynth amDef = new AMDefinitionSynth(this);
+
+      if (baselineTimeMs == 0) {
+        baselineTimeMs = jobStartTime;
+      }
+      adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs);
+      return amDef;
+    }
+  }
+
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java
new file mode 100644
index 0000000..4a39d37
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import java.util.Map;
+
+public class JobDefinition {
+  private AMDefinition amDefinition;
+  private ReservationId reservationId;
+  private long deadline;
+  private Map<String, String> params;
+
+  public AMDefinition getAmDefinition() {
+    return amDefinition;
+  }
+
+  public ReservationId getReservationId() {
+    return reservationId;
+  }
+
+  public long getDeadline() {
+    return deadline;
+  }
+
+  //Currently unused
+  public Map<String, String> getParams() {
+    return params;
+  }
+
+  public static final class Builder {
+    private AMDefinition amDefinition;
+    private ReservationId reservationId;
+    private long deadline;
+    private Map<String, String> params;
+
+    private Builder() {
+    }
+
+    public static Builder create() {
+      return new Builder();
+    }
+
+    public Builder withAmDefinition(AMDefinition amDefinition) {
+      this.amDefinition = amDefinition;
+      return this;
+    }
+
+    public Builder withReservationId(ReservationId reservationId) {
+      this.reservationId = reservationId;
+      return this;
+    }
+
+    public Builder withDeadline(long deadline) {
+      this.deadline = deadline;
+      return this;
+    }
+
+    public Builder withParams(Map<String, String> params) {
+      this.params = params;
+      return this;
+    }
+
+    public JobDefinition build() {
+      JobDefinition jobDef = new JobDefinition();
+      jobDef.params = this.params;
+      jobDef.amDefinition = this.amDefinition;
+      jobDef.reservationId = this.reservationId;
+      jobDef.deadline = this.deadline;
+      return jobDef;
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index a8d2aa6..83834e8 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -23,12 +23,10 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.security.Security;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -59,13 +57,10 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.TableMapping;
 import org.apache.hadoop.tools.rumen.JobTraceReader;
 import org.apache.hadoop.tools.rumen.LoggedJob;
-import org.apache.hadoop.tools.rumen.LoggedTask;
-import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -89,7 +84,6 @@ import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
 import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
 import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
 import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
-import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
 import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
 import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
@@ -138,13 +132,8 @@ public class SLSRunner extends Configured implements Tool {
   // logger
   public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
 
-  private final static int DEFAULT_MAPPER_PRIORITY = 20;
-  private final static int DEFAULT_REDUCER_PRIORITY = 10;
-
   private static boolean exitAtTheFinish = false;
 
-  private static final String DEFAULT_USER = "default";
-
   /**
    * The type of trace in input.
    */
@@ -472,7 +461,10 @@ public class SLSRunner extends Configured implements Tool {
 
       while (jobIter.hasNext()) {
         try {
-          createAMForJob(jobIter.next());
+          Map jsonJob = jobIter.next();
+          AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(
+              jsonJob, this);
+          startAMs(amDef);
         } catch (Exception e) {
           LOG.error("Failed to create an AM: {}", e.getMessage());
         }
@@ -480,150 +472,29 @@ public class SLSRunner extends Configured implements Tool {
     }
   }
 
-  private void createAMForJob(Map jsonJob) throws YarnException {
-    long jobStartTime = Long.parseLong(
-        jsonJob.get(SLSConfiguration.JOB_START_MS).toString());
-
-    long jobFinishTime = 0;
-    if (jsonJob.containsKey(SLSConfiguration.JOB_END_MS)) {
-      jobFinishTime = Long.parseLong(
-          jsonJob.get(SLSConfiguration.JOB_END_MS).toString());
-    }
-
-    String jobLabelExpr = null;
-    if (jsonJob.containsKey(SLSConfiguration.JOB_LABEL_EXPR)) {
-      jobLabelExpr = jsonJob.get(SLSConfiguration.JOB_LABEL_EXPR).toString();
-    }
-
-    String user = (String) jsonJob.get(SLSConfiguration.JOB_USER);
-    if (user == null) {
-      user = "default";
-    }
-
-    String queue = jsonJob.get(SLSConfiguration.JOB_QUEUE_NAME).toString();
-    increaseQueueAppNum(queue);
-
-    String amType = (String)jsonJob.get(SLSConfiguration.AM_TYPE);
-    if (amType == null) {
-      amType = SLSUtils.DEFAULT_JOB_TYPE;
-    }
-
-    int jobCount = 1;
-    if (jsonJob.containsKey(SLSConfiguration.JOB_COUNT)) {
-      jobCount = Integer.parseInt(
-          jsonJob.get(SLSConfiguration.JOB_COUNT).toString());
-    }
-    jobCount = Math.max(jobCount, 1);
-
-    String oldAppId = (String)jsonJob.get(SLSConfiguration.JOB_ID);
-    // Job id is generated automatically if this job configuration allows
-    // multiple job instances
-    if(jobCount > 1) {
-      oldAppId = null;
-    }
-
-    for (int i = 0; i < jobCount; i++) {
-      runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
-          getTaskContainers(jsonJob), getAMContainerResource(jsonJob),
-          jobLabelExpr);
-    }
-  }
-
-  private List<ContainerSimulator> getTaskContainers(Map jsonJob)
-      throws YarnException {
-    List<ContainerSimulator> containers = new ArrayList<>();
-    List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
-    if (tasks == null || tasks.size() == 0) {
-      throw new YarnException("No task for the job!");
-    }
-
-    for (Object o : tasks) {
-      Map jsonTask = (Map) o;
-
-      String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
-
-      long duration = 0;
-      if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) {
-        duration = Integer.parseInt(
-            jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString());
-      } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) {
-        // Also support "duration.ms" for backward compatibility
-        duration = Integer.parseInt(
-            jsonTask.get(SLSConfiguration.DURATION_MS).toString());
-      } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) &&
-          jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) {
-        long taskStart = Long.parseLong(
-            jsonTask.get(SLSConfiguration.TASK_START_MS).toString());
-        long taskFinish = Long.parseLong(
-            jsonTask.get(SLSConfiguration.TASK_END_MS).toString());
-        duration = taskFinish - taskStart;
-      }
-      if (duration <= 0) {
-        throw new YarnException("Duration of a task shouldn't be less or equal"
-            + " to 0!");
-      }
-
-      Resource res = getResourceForContainer(jsonTask);
-
-      int priority = DEFAULT_MAPPER_PRIORITY;
-      if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) {
-        priority = Integer.parseInt(
-            jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString());
-      }
-
-      String type = "map";
-      if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) {
-        type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString();
-      }
-
-      int count = 1;
-      if (jsonTask.containsKey(SLSConfiguration.COUNT)) {
-        count = Integer.parseInt(
-            jsonTask.get(SLSConfiguration.COUNT).toString());
-      }
-      count = Math.max(count, 1);
-
-      ExecutionType executionType = ExecutionType.GUARANTEED;
-      if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) {
-        executionType = ExecutionType.valueOf(
-            jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString());
-      }
-      long allocationId = -1;
-      if (jsonTask.containsKey(SLSConfiguration.TASK_ALLOCATION_ID)) {
-        allocationId = Long.parseLong(
-            jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString());
-      }
-
-      long requestDelay = 0;
-      if (jsonTask.containsKey(SLSConfiguration.TASK_REQUEST_DELAY)) {
-        requestDelay = Long.parseLong(
-            jsonTask.get(SLSConfiguration.TASK_REQUEST_DELAY).toString());
-      }
-      requestDelay = Math.max(requestDelay, 0);
-
-      for (int i = 0; i < count; i++) {
-        containers.add(
-            new ContainerSimulator(res, duration, hostname, priority, type,
-                executionType, allocationId, requestDelay));
-      }
+  private void startAMs(AMDefinition amDef) {
+    for (int i = 0; i < amDef.getJobCount(); i++) {
+      JobDefinition jobDef = JobDefinition.Builder.create()
+          .withAmDefinition(amDef)
+          .withDeadline(-1)
+          .withReservationId(null)
+          .withParams(null)
+          .build();
+      runNewAM(jobDef);
     }
-
-    return containers;
   }
 
-  private Resource getResourceForContainer(Map jsonTask) {
-    Resource res = getDefaultContainerResource();
-    ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
-    for (ResourceInformation info : infors) {
-      if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
-        long value = Long.parseLong(
-            jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
-                .toString());
-        res.setResourceValue(info.getName(), value);
-      }
+  private void startAMs(AMDefinition amDef, ReservationId reservationId,
+      Map<String, String> params, long deadline) {
+    for (int i = 0; i < amDef.getJobCount(); i++) {
+      JobDefinition jobDef = JobDefinition.Builder.create()
+          .withAmDefinition(amDef)
+          .withReservationId(reservationId)
+          .withParams(params)
+          .withDeadline(deadline)
+          .build();
+      runNewAM(jobDef);
     }
-
-    return res;
   }
 
   /**
@@ -642,76 +513,19 @@ public class SLSRunner extends Configured implements Tool {
 
       while (job != null) {
         try {
-          createAMForJob(job, baselineTimeMS);
+          AMDefinitionRumen amDef =
+              AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS,
+                  this);
+          startAMs(amDef);
         } catch (Exception e) {
           LOG.error("Failed to create an AM", e);
         }
-
         job = reader.getNext();
       }
     }
   }
 
-  private void createAMForJob(LoggedJob job, long baselineTimeMs)
-      throws YarnException {
-    String user = job.getUser() == null ? "default" :
-        job.getUser().getValue();
-    String jobQueue = job.getQueue().getValue();
-    String oldJobId = job.getJobID().toString();
-    long jobStartTimeMS = job.getSubmitTime();
-    long jobFinishTimeMS = job.getFinishTime();
-    if (baselineTimeMs == 0) {
-      baselineTimeMs = job.getSubmitTime();
-    }
-    jobStartTimeMS -= baselineTimeMs;
-    jobFinishTimeMS -= baselineTimeMs;
-    if (jobStartTimeMS < 0) {
-      LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
-      jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
-      jobStartTimeMS = 0;
-    }
-
-    increaseQueueAppNum(jobQueue);
-
-    List<ContainerSimulator> containerList = new ArrayList<>();
-    // mapper
-    for (LoggedTask mapTask : job.getMapTasks()) {
-      if (mapTask.getAttempts().size() == 0) {
-        throw new YarnException("Invalid map task, no attempt for a mapper!");
-      }
-      LoggedTaskAttempt taskAttempt =
-          mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
-      String hostname = taskAttempt.getHostName().getValue();
-      long containerLifeTime = taskAttempt.getFinishTime() -
-          taskAttempt.getStartTime();
-      containerList.add(
-          new ContainerSimulator(getDefaultContainerResource(),
-              containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
-    }
-
-    // reducer
-    for (LoggedTask reduceTask : job.getReduceTasks()) {
-      if (reduceTask.getAttempts().size() == 0) {
-        throw new YarnException(
-            "Invalid reduce task, no attempt for a reducer!");
-      }
-      LoggedTaskAttempt taskAttempt =
-          reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
-      String hostname = taskAttempt.getHostName().getValue();
-      long containerLifeTime = taskAttempt.getFinishTime() -
-          taskAttempt.getStartTime();
-      containerList.add(
-          new ContainerSimulator(getDefaultContainerResource(),
-              containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
-    }
-
-    // Only supports the default job type currently
-    runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
-        jobStartTimeMS, jobFinishTimeMS, containerList,
-        getAMContainerResource(null));
-  }
-
-  private Resource getDefaultContainerResource() {
+  Resource getDefaultContainerResource() {
     int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
         SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
     int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
@@ -726,94 +540,26 @@ public class SLSRunner extends Configured implements Tool {
   private void startAMFromSynthGenerator() throws YarnException, IOException {
     Configuration localConf = new Configuration();
     localConf.set("fs.defaultFS", "file:///");
-    long baselineTimeMS = 0;
-
     // if we use the nodeFile this could have been not initialized yet.
     if (stjp == null) {
       stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
     }
 
-    SynthJob job = null;
+    SynthJob job;
     // we use stjp, a reference to the job producer instantiated during node
     // creation
     while ((job = (SynthJob) stjp.getNextJob()) != null) {
-      // only support MapReduce currently
-      String user = job.getUser() == null ? DEFAULT_USER :
-              job.getUser();
-      String jobQueue = job.getQueueName();
-      String oldJobId = job.getJobID().toString();
-      long jobStartTimeMS = job.getSubmissionTime();
-
-      // CARLO: Finish time is only used for logging, omit for now
-      long jobFinishTimeMS = jobStartTimeMS + job.getDuration();
-
-      if (baselineTimeMS == 0) {
-        baselineTimeMS = jobStartTimeMS;
-      }
-      jobStartTimeMS -= baselineTimeMS;
-      jobFinishTimeMS -= baselineTimeMS;
-      if (jobStartTimeMS < 0) {
-        LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
-        jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
-        jobStartTimeMS = 0;
-      }
-
-      increaseQueueAppNum(jobQueue);
-
-      List<ContainerSimulator> containerList =
-          new ArrayList<ContainerSimulator>();
-      ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
-      Random rand = new Random(stjp.getSeed());
-
-      for (SynthJob.SynthTask task : job.getTasks()) {
-        RMNode node = nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
-            .getNode();
-        String hostname = "/" + node.getRackName() + "/" + node.getHostName();
-        long containerLifeTime = task.getTime();
-        Resource containerResource = Resource
-            .newInstance((int) task.getMemory(), (int) task.getVcores());
-        containerList.add(
-            new ContainerSimulator(containerResource, containerLifeTime,
-                hostname, task.getPriority(), task.getType(),
-                task.getExecutionType()));
-      }
-
-
       ReservationId reservationId = null;
-
-      if(job.hasDeadline()){
+      if (job.hasDeadline()) {
         reservationId = ReservationId
-            .newInstance(this.rm.getStartTime(), AM_ID);
-      }
-
-      runNewAM(job.getType(), user, jobQueue, oldJobId,
-          jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
-          job.getDeadline(), getAMContainerResource(null), null,
-          job.getParams());
-    }
-  }
-
-  private Resource getAMContainerResource(Map jsonJob) {
-    Resource amContainerResource =
-        SLSConfiguration.getAMContainerResource(getConf());
-
-    if (jsonJob == null) {
-      return amContainerResource;
-    }
-
-    ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
-    for (ResourceInformation info : infors) {
-      String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
-      if (jsonJob.containsKey(key)) {
-        long value = Long.parseLong(jsonJob.get(key).toString());
-        amContainerResource.setResourceValue(info.getName(), value);
+            .newInstance(rm.getStartTime(), AM_ID);
       }
+      AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, this);
+      startAMs(amDef, reservationId, job.getParams(), job.getDeadline());
     }
-
-    return amContainerResource;
   }
 
-  private void increaseQueueAppNum(String queue) throws YarnException {
+  void increaseQueueAppNum(String queue) throws YarnException {
     SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
     String queueName = wrapper.getRealQueueName(queue);
     Integer appNum = queueAppNumMap.get(queueName);
@@ -830,32 +576,16 @@ public class SLSRunner extends Configured implements Tool {
     }
   }
 
-  private void runNewAM(String jobType, String user,
-      String jobQueue, String oldJobId, long jobStartTimeMS,
-      long jobFinishTimeMS, List<ContainerSimulator> containerList,
-      Resource amContainerResource) {
-    runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
-        jobFinishTimeMS, containerList, null,  -1,
-        amContainerResource, null, null);
-  }
-
-  private void runNewAM(String jobType, String user,
-      String jobQueue, String oldJobId, long jobStartTimeMS,
-      long jobFinishTimeMS, List<ContainerSimulator> containerList,
-      Resource amContainerResource, String labelExpr) {
-    runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
-        jobFinishTimeMS, containerList, null,  -1,
-        amContainerResource, labelExpr, null);
+  private AMSimulator createAmSimulator(String jobType) {
+    return (AMSimulator) ReflectionUtils.newInstance(
+          amClassMap.get(jobType), new Configuration());
   }
 
-  @SuppressWarnings("checkstyle:parameternumber")
-  private void runNewAM(String jobType, String user,
-      String jobQueue, String oldJobId, long jobStartTimeMS,
-      long jobFinishTimeMS, List<ContainerSimulator> containerList,
-      ReservationId reservationId, long deadline, Resource amContainerResource,
-      String labelExpr, Map<String, String> params) {
-    AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
-        amClassMap.get(jobType), new Configuration());
+  private void runNewAM(JobDefinition jobDef) {
+    AMDefinition amDef = jobDef.getAmDefinition();
+    String oldJobId = amDef.getOldAppId();
+    AMSimulator amSim =
+        createAmSimulator(amDef.getAmType());
 
     if (amSim != null) {
       int heartbeatInterval = getConf().getInt(
@@ -867,19 +597,17 @@ public class SLSRunner extends Configured implements Tool {
         oldJobId = Integer.toString(AM_ID);
       }
       AM_ID++;
-      amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
-          jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
-          runner.getStartTimeMS(), amContainerResource, labelExpr, params,
-          appIdAMSim);
-      if(reservationId != null) {
+      amSim.init(amDef, rm, this, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim);
+      if (jobDef.getReservationId() != null) {
         // if we have a ReservationId, delegate reservation creation to
         // AMSim (reservation shape is impl specific)
         UTCClock clock = new UTCClock();
-        amSim.initReservation(reservationId, deadline, clock.getTime());
+        amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(),
+            clock.getTime());
       }
       runner.schedule(amSim);
-      maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
-      numTasks += containerList.size();
+      maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime());
+      numTasks += amDef.getTaskContainers().size();
       amMap.put(oldJobId, amSim);
     }
   }
@@ -1121,4 +849,12 @@ public class SLSRunner extends Configured implements Tool {
       return result;
     }
   }
+
+  public ResourceManager getRm() {
+    return rm;
+  }
+
+  public SynthTraceJobProducer getStjp() {
+    return stjp;
+  }
 }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
new file mode 100644
index 0000000..1b0cd90
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.sls.AMDefinitionRumen.DEFAULT_MAPPER_PRIORITY;
+
+public class TaskContainerDefinition {
+  private long duration;
+  private Resource resource;
+  private int priority;
+  private String type;
+  private int count;
+  private ExecutionType executionType;
+  private long allocationId = -1;
+  private long requestDelay = 0;
+  private String hostname;
+
+  public long getDuration() {
+    return duration;
+  }
+
+  public Resource getResource() {
+    return resource;
+  }
+
+  public int getPriority() {
+    return priority;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public int getCount() {
+    return count;
+  }
+
+  public ExecutionType getExecutionType() {
+    return executionType;
+  }
+
+  public long getAllocationId() {
+    return allocationId;
+  }
+
+  public long getRequestDelay() {
+    return requestDelay;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public static final class Builder {
+    private long duration = -1;
+    private long durationLegacy = -1;
+    private long taskStart = -1;
+    private long taskFinish = -1;
+    private Resource resource;
+    private int priority = DEFAULT_MAPPER_PRIORITY;
+    private String type = "map";
+    private int count = 1;
+    private ExecutionType executionType = ExecutionType.GUARANTEED;
+    private long allocationId = -1;
+    private long requestDelay = 0;
+    private String hostname;
+
+    public static Builder create() {
+      return new Builder();
+    }
+
+    public Builder withDuration(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        this.duration = Integer.parseInt(jsonTask.get(key).toString());
+      }
+      return this;
+    }
+
+    public Builder withDuration(long duration) {
+      this.duration = duration;
+      return this;
+    }
+
+    /**
+     * Also support "duration.ms" for backward compatibility.
+     * @param jsonTask the json representation of the task.
+     * @param key The json key.
+     * @return the builder
+     */
+    public Builder withDurationLegacy(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        this.durationLegacy = Integer.parseInt(jsonTask.get(key).toString());
+      }
+      return this;
+    }
+
+    public Builder withTaskStart(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        this.taskStart = Long.parseLong(jsonTask.get(key).toString());
+      }
+      return this;
+    }
+
+    public Builder withTaskFinish(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        this.taskFinish = Long.parseLong(jsonTask.get(key).toString());
+      }
+      return this;
+    }
+
+    public Builder withResource(Resource resource) {
+      this.resource = resource;
+      return this;
+    }
+
+    public Builder withPriority(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        this.priority = Integer.parseInt(jsonTask.get(key).toString());
+      }
+      return this;
+    }
+
+    public Builder withPriority(int priority) {
+      this.priority = priority;
+      return this;
+    }
+
+    public Builder withType(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        this.type = jsonTask.get(key).toString();
+      }
+      return this;
+    }
+
+    public Builder withType(String type) {
+      this.type = type;
+      return this;
+    }
+
+    public Builder withCount(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        count = Integer.parseInt(jsonTask.get(key).toString());
+        count = Math.max(count, 1);
+      }
+      return this;
+    }
+
+    public Builder withCount(int count) {
+      this.count = count;
+      return this;
+    }
+
+    public Builder withExecutionType(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        this.executionType = ExecutionType.valueOf(
+            jsonTask.get(key).toString());
+      }
+      return this;
+    }
+
+    public Builder withExecutionType(ExecutionType executionType) {
+      this.executionType = executionType;
+      return this;
+    }
+
+    public Builder withAllocationId(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        this.allocationId = Long.parseLong(jsonTask.get(key).toString());
+      }
+      return this;
+    }
+
+    public Builder withAllocationId(long allocationId) {
+      this.allocationId = allocationId;
+      return this;
+    }
+
+    public Builder withRequestDelay(Map<?, ?> jsonTask, String key) {
+      if (jsonTask.containsKey(key)) {
+        requestDelay = Long.parseLong(jsonTask.get(key).toString());
+        requestDelay = Math.max(requestDelay, 0);
+      }
+      return this;
+    }
+
+    public Builder withRequestDelay(long requestDelay) {
+      this.requestDelay = requestDelay;
+      return this;
+    }
+
+    public Builder withHostname(String hostname) {
+      this.hostname = hostname;
+      return this;
+    }
+
+    public TaskContainerDefinition build() throws YarnException {
+      TaskContainerDefinition taskContainerDef =
+          new TaskContainerDefinition();
+      taskContainerDef.duration = validateAndGetDuration(this);
+      taskContainerDef.resource = this.resource;
+      taskContainerDef.type = this.type;
+      taskContainerDef.requestDelay = this.requestDelay;
+      taskContainerDef.priority = this.priority;
+      taskContainerDef.count = this.count;
+      taskContainerDef.allocationId = this.allocationId;
+      taskContainerDef.executionType = this.executionType;
+      taskContainerDef.hostname = this.hostname;
+      return taskContainerDef;
+    }
+
+    private long validateAndGetDuration(Builder builder) throws YarnException {
+      long duration = 0;
+
+      if (builder.duration != -1) {
+        duration = builder.duration;
+      } else if (builder.durationLegacy != -1) {
+        duration = builder.durationLegacy;
+      } else if (builder.taskStart != -1 && builder.taskFinish != -1) {
+        duration = builder.taskFinish - builder.taskStart;
+      }
+
+      if (duration <= 0) {
+        throw new YarnException("Duration of a task shouldn't be less or equal"
+            + " to 0!");
+      }
+      return duration;
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 922f9a2..0a87a6c 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.AMDefinition;
 import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
@@ -128,27 +129,25 @@ public abstract class AMSimulator extends TaskRunner.Task {
     this.responseQueue = new LinkedBlockingQueue<>();
   }
 
-  @SuppressWarnings("checkstyle:parameternumber")
-  public void init(int heartbeatInterval,
-      List<ContainerSimulator> containerList, ResourceManager resourceManager,
-      SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
-      String simQueue, boolean tracked, String oldApp, long baseTimeMS,
-      Resource amResource, String nodeLabelExpr, Map<String, String> params,
-      Map<ApplicationId, AMSimulator> appIdAMSim) {
-    super.init(startTime, startTime + 1000000L * heartbeatInterval,
-        heartbeatInterval);
-    this.user = simUser;
-    this.rm = resourceManager;
-    this.se = slsRunnner;
-    this.queue = simQueue;
-    this.oldAppId = oldApp;
+  public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
+      boolean tracked, long baselineTimeMS, long heartbeatInterval,
+      Map<ApplicationId, AMSimulator> appIdToAMSim) {
+    long startTime = amDef.getJobStartTime();
+    long endTime = startTime + 1000000L * heartbeatInterval;
+    super.init(startTime, endTime, heartbeatInterval);
+
+    this.user = amDef.getUser();
+    this.queue = amDef.getQueue();
+    this.oldAppId = amDef.getOldAppId();
+    this.amContainerResource = amDef.getAmResource();
+    this.nodeLabelExpression = amDef.getLabelExpression();
+    this.traceStartTimeMS = amDef.getJobStartTime();
+    this.traceFinishTimeMS = amDef.getJobFinishTime();
+    this.rm = rm;
+    this.se = slsRunner;
     this.isTracked = tracked;
-    this.baselineTimeMS = baseTimeMS;
-    this.traceStartTimeMS = startTime;
-    this.traceFinishTimeMS = finishTime;
-    this.amContainerResource = amResource;
-    this.nodeLabelExpression = nodeLabelExpr;
-    this.appIdToAMSim = appIdAMSim;
+    this.baselineTimeMS = baselineTimeMS;
+    this.appIdToAMSim = appIdToAMSim;
   }
 
   /**
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
index 83467e0..418408d 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.AMDefinition;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.slf4j.Logger;
@@ -93,19 +93,15 @@ public class DAGAMSimulator extends AMSimulator {
       LoggerFactory.getLogger(DAGAMSimulator.class);
 
   @SuppressWarnings("checkstyle:parameternumber")
-  public void init(int heartbeatInterval,
-      List<ContainerSimulator> containerList, ResourceManager resourceManager,
-      SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
-      String simQueue, boolean tracked, String oldApp, long baseTimeMS,
-      Resource amResource, String nodeLabelExpr, Map<String, String> params,
-      Map<ApplicationId, AMSimulator> appIdAMSim) {
-    super.init(heartbeatInterval, containerList, resourceManager, slsRunnner,
-        startTime, finishTime, simUser, simQueue, tracked, oldApp, baseTimeMS,
-        amResource, nodeLabelExpr, params, appIdAMSim);
+  public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
+      boolean tracked, long baselineTimeMS, long heartbeatInterval,
+      Map<ApplicationId, AMSimulator> appIdToAMSim) {
+    super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, heartbeatInterval,
+        appIdToAMSim);
     super.amtype = "dag";
 
-    allContainers.addAll(containerList);
-    pendingContainers.addAll(containerList);
+    allContainers.addAll(amDef.getTaskContainers());
+    pendingContainers.addAll(amDef.getTaskContainers());
     totalContainers = allContainers.size();
 
     LOG.info("Added new job with {} containers", allContainers.size());
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 184fdca..976c022 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.AMDefinition;
 import org.apache.hadoop.yarn.sls.ReservationClientUtil;
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.apache.hadoop.yarn.sls.SLSRunner;
@@ -123,19 +124,15 @@ public class MRAMSimulator extends AMSimulator {
       LoggerFactory.getLogger(MRAMSimulator.class);
 
   @SuppressWarnings("checkstyle:parameternumber")
-  public void init(int heartbeatInterval,
-      List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
-      long traceStartTime, long traceFinishTime, String user, String queue,
-      boolean isTracked, String oldAppId, long baselineStartTimeMS,
-      Resource amContainerResource, String nodeLabelExpr,
-      Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
-    super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
-        traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
-        amContainerResource, nodeLabelExpr, params, appIdAMSim);
+  public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
+      boolean tracked, long baselineTimeMS, long heartbeatInterval,
+      Map<ApplicationId, AMSimulator> appIdToAMSim) {
+    super.init(amDef, rm, slsRunner, tracked, baselineTimeMS,
+        heartbeatInterval, appIdToAMSim);
     amtype = "mapreduce";
 
     // get map/reduce tasks
-    for (ContainerSimulator cs : containerList) {
+    for (ContainerSimulator cs : amDef.getTaskContainers()) {
       if (cs.getType().equals("map")) {
         cs.setPriority(PRIORITY_MAP);
         allMaps.add(cs);
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
index 7e35451..09297af 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -30,11 +30,11 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.AMDefinition;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.slf4j.Logger;
@@ -93,21 +93,14 @@ public class StreamAMSimulator extends AMSimulator {
       LoggerFactory.getLogger(StreamAMSimulator.class);
 
   @SuppressWarnings("checkstyle:parameternumber")
-  public void init(int heartbeatInterval,
-      List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
-      long traceStartTime, long traceFinishTime, String user, String queue,
-      boolean isTracked, String oldAppId, long baselineStartTimeMS,
-      Resource amContainerResource, String nodeLabelExpr,
-      Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
-    super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
-        traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
-        amContainerResource, nodeLabelExpr, params, appIdAMSim);
+  public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
+      boolean tracked, long baselineTimeMS, long heartbeatInterval,
+      Map<ApplicationId, AMSimulator> appIdToAMSim) {
+    super.init(amDef, rm, slsRunner, tracked, baselineTimeMS,
+        heartbeatInterval, appIdToAMSim);
     amtype = "stream";
-
-    allStreams.addAll(containerList);
-
-    duration = traceFinishTime - traceStartTime;
-
+    allStreams.addAll(amDef.getTaskContainers());
+    duration = amDef.getJobFinishTime() - amDef.getJobStartTime();
     LOG.info("Added new job with {} streams, running for {}",
         allStreams.size(), duration);
   }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
index e83ee91..8f11994 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
@@ -26,54 +26,41 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.sls.TaskContainerDefinition;
 
 @Private
 @Unstable
 public class ContainerSimulator implements Delayed {
-  // id
   private ContainerId id;
-  // resource allocated
   private Resource resource;
-  // end time
   private long endTime;
   // life time (ms)
   private long lifeTime;
   // time(ms) after which container would be requested by AM
   private long requestDelay;
-  // host name
   private String hostname;
-  // priority
   private int priority;
-  // type 
   private String type;
-  // execution type
   private ExecutionType executionType = ExecutionType.GUARANTEED;
-  // allocation id
   private long allocationId;
 
   /**
-   * invoked when AM schedules containers to allocate.
+   * Invoked when AM schedules containers to allocate.
+   * @param def The task's definition object.
+   * @return ContainerSimulator object
    */
-  public ContainerSimulator(Resource resource, long lifeTime,
-      String hostname, int priority, String type) {
-    this(resource, lifeTime, hostname, priority, type,
-        ExecutionType.GUARANTEED);
+  public static ContainerSimulator createFromTaskContainerDefinition(
+      TaskContainerDefinition def) {
+    return new ContainerSimulator(def.getResource(), def.getDuration(),
+        def.getHostname(), def.getPriority(), def.getType(),
+        def.getExecutionType(), def.getAllocationId(), def.getRequestDelay());
   }
 
   /**
-   * invoked when AM schedules containers to allocate.
-   */
-  public ContainerSimulator(Resource resource, long lifeTime,
-      String hostname, int priority, String type, ExecutionType executionType) {
-    this(resource, lifeTime, hostname, priority, type,
-        executionType, -1, 0);
-  }
-
-  /**
-   * invoked when AM schedules containers to allocate.
+   * Invoked when AM schedules containers to allocate.
    */
   @SuppressWarnings("checkstyle:parameternumber")
-  public ContainerSimulator(Resource resource, long lifeTime,
+  private ContainerSimulator(Resource resource, long lifeTime,
       String hostname, int priority, String type, ExecutionType executionType,
       long allocationId, long requestDelay) {
     this.resource = resource;
@@ -87,7 +74,7 @@ public class ContainerSimulator implements Delayed {
   }
 
   /**
-   * invoke when NM schedules containers to run.
+   * Invoked when NM schedules containers to run.
    */
   public ContainerSimulator(ContainerId id, Resource resource, long endTime,
       long lifeTime, long allocationId) {
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
index 256dcf4..e529d18 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
@@ -57,8 +57,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 @Private
 @Unstable
 public class SLSUtils {
-  public final static String DEFAULT_JOB_TYPE = "mapreduce";
-
   private static final String LABEL_FORMAT_ERR_MSG =
       "Input format for adding node-labels is not correct, it should be "
           + "labelName1[(exclusive=true/false)],labelName2[] ..";
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
index 8ac7fff..e458b86 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
@@ -26,6 +26,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for DagAMSimulator.
@@ -74,7 +76,17 @@ public class TestDagAMSimulator {
 
   private ContainerSimulator createContainerSim(long allocationId,
       long requestDelay) {
-    return new ContainerSimulator(null, 1000, "*", 1, "Map",
-        null, allocationId, requestDelay);
+    TaskContainerDefinition taskContainerDef =
+        mock(TaskContainerDefinition.class);
+    when(taskContainerDef.getResource()).thenReturn(null);
+    when(taskContainerDef.getDuration()).thenReturn(1000L);
+    when(taskContainerDef.getHostname()).thenReturn("*");
+    when(taskContainerDef.getPriority()).thenReturn(1);
+    when(taskContainerDef.getType()).thenReturn("Map");
+    when(taskContainerDef.getExecutionType()).thenReturn(null);
+    when(taskContainerDef.getAllocationId()).thenReturn(allocationId);
+    when(taskContainerDef.getRequestDelay()).thenReturn(requestDelay);
+    return ContainerSimulator.createFromTaskContainerDefinition(
+        taskContainerDef);
   }
 }
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index 50ac700..f5db168 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.sls.appmaster;
 import com.codahale.metrics.MetricRegistry;
 import java.util.HashMap;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.tools.rumen.datatypes.UserName;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -33,6 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.sls.AMDefinitionRumen;
+import org.apache.hadoop.yarn.sls.TaskContainerDefinition;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
@@ -57,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
@@ -157,9 +161,20 @@ public class TestAMSimulator {
     String queue = "default";
     List<ContainerSimulator> containers = new ArrayList<>();
     HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
-    app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
-        appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null,
-        map);
+
+    UserName mockUser = mock(UserName.class);
+    when(mockUser.getValue()).thenReturn("user1");
+    AMDefinitionRumen amDef =
+        AMDefinitionRumen.Builder.create()
+        .withUser(mockUser)
+        .withQueue(queue)
+        .withJobId(appId)
+        .withJobStartTime(0)
+        .withJobFinishTime(1000000L)
+        .withAmResource(SLSConfiguration.getAMContainerResource(conf))
+        .withTaskContainers(containers)
+        .build();
+    app.init(amDef, rm, null, true, 0, 1000, map);
     app.firstStep();
 
     verifySchedulerMetrics(appId);
@@ -184,9 +199,21 @@ public class TestAMSimulator {
       String queue = "default";
       List<ContainerSimulator> containers = new ArrayList<>();
       HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
-      app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
-          appId, 0, SLSConfiguration.getAMContainerResource(conf), "label1",
-          null, map);
+
+      UserName mockUser = mock(UserName.class);
+      when(mockUser.getValue()).thenReturn("user1");
+      AMDefinitionRumen amDef =
+          AMDefinitionRumen.Builder.create()
+              .withUser(mockUser)
+              .withQueue(queue)
+              .withJobId(appId)
+              .withJobStartTime(0)
+              .withJobFinishTime(1000000L)
+              .withAmResource(SLSConfiguration.getAMContainerResource(conf))
+              .withTaskContainers(containers)
+              .withLabelExpression("label1")
+              .build();
+      app.init(amDef, rm, null, true, 0, 1000, map);
       app.firstStep();
 
       verifySchedulerMetrics(appId);
@@ -201,7 +228,7 @@ public class TestAMSimulator {
   }
 
   @Test
-  public void testPackageRequests() {
+  public void testPackageRequests() throws YarnException {
     MockAMSimulator app = new MockAMSimulator();
     List<ContainerSimulator> containerSimulators = new ArrayList<>();
     Resource resource = Resources.createResource(1024);
@@ -209,12 +236,25 @@ public class TestAMSimulator {
     ExecutionType execType = ExecutionType.GUARANTEED;
     String type = "map";
 
-    ContainerSimulator s1 = new ContainerSimulator(resource, 100,
-        "/default-rack/h1", priority, type, execType);
-    ContainerSimulator s2 = new ContainerSimulator(resource, 100,
-        "/default-rack/h1", priority, type, execType);
-    ContainerSimulator s3 = new ContainerSimulator(resource, 100,
-        "/default-rack/h2", priority, type, execType);
+    TaskContainerDefinition.Builder builder =
+        TaskContainerDefinition.Builder.create()
+        .withResource(resource)
+        .withDuration(100)
+        .withPriority(1)
+        .withType(type)
+        .withExecutionType(execType)
+        .withAllocationId(-1)
+        .withRequestDelay(0);
+
+    ContainerSimulator s1 = ContainerSimulator
+        .createFromTaskContainerDefinition(
+            builder.withHostname("/default-rack/h1").build());
+    ContainerSimulator s2 = ContainerSimulator
+        .createFromTaskContainerDefinition(
+            builder.withHostname("/default-rack/h1").build());
+    ContainerSimulator s3 = ContainerSimulator
+        .createFromTaskContainerDefinition(
+            builder.withHostname("/default-rack/h2").build());
 
     containerSimulators.add(s1);
     containerSimulators.add(s2);
@@ -250,12 +290,15 @@ public class TestAMSimulator {
     Assert.assertEquals(2, nodeRequestCount);
 
     containerSimulators.clear();
-    s1 = new ContainerSimulator(resource, 100,
-        "/default-rack/h1", priority, type, execType, 1, 0);
-    s2 = new ContainerSimulator(resource, 100,
-        "/default-rack/h1", priority, type, execType, 2, 0);
-    s3 = new ContainerSimulator(resource, 100,
-        "/default-rack/h2", priority, type, execType, 1, 0);
+    s1 = ContainerSimulator.createFromTaskContainerDefinition(
+        createDefaultTaskContainerDefMock(resource, priority, execType, type,
+            "/default-rack/h1", 1));
+    s2 = ContainerSimulator.createFromTaskContainerDefinition(
+        createDefaultTaskContainerDefMock(resource, priority, execType, type,
+            "/default-rack/h1", 2));
+    s3 = ContainerSimulator.createFromTaskContainerDefinition(
+        createDefaultTaskContainerDefMock(resource, priority, execType, type,
+            "/default-rack/h2", 1));
 
     containerSimulators.add(s1);
     containerSimulators.add(s2);
@@ -317,6 +360,20 @@ public class TestAMSimulator {
     Assert.assertFalse(nm.getNode().getRunningApps().contains(app.appId));
     Assert.assertTrue(nm.getNode().getRunningApps().isEmpty());
   }
+  private TaskContainerDefinition createDefaultTaskContainerDefMock(
+      Resource resource, int priority, ExecutionType execType, String type,
+      String hostname, long allocationId) {
+    TaskContainerDefinition taskContainerDef =
+        mock(TaskContainerDefinition.class);
+    when(taskContainerDef.getResource()).thenReturn(resource);
+    when(taskContainerDef.getDuration()).thenReturn(100L);
+    when(taskContainerDef.getPriority()).thenReturn(priority);
+    when(taskContainerDef.getType()).thenReturn(type);
+    when(taskContainerDef.getExecutionType()).thenReturn(execType);
+    when(taskContainerDef.getHostname()).thenReturn(hostname);
+    when(taskContainerDef.getAllocationId()).thenReturn(allocationId);
+    return taskContainerDef;
+  }
 
   @After
   public void tearDown() {

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org