You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ab...@apache.org on 2019/08/29 06:13:58 UTC

[hadoop] branch trunk updated: YARN-9754. Add support for arbitrary DAG AM Simulator. Contributed by Abhishek Modi.

This is an automated email from the ASF dual-hosted git repository.

abmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 16576fd  YARN-9754. Add support for arbitrary DAG AM Simulator. Contributed by Abhishek Modi.
16576fd is described below

commit 16576fde8e3697025e1d37a4c21f8ad904208f8e
Author: Abhishek Modi <ab...@apache.org>
AuthorDate: Thu Aug 29 11:43:40 2019 +0530

    YARN-9754. Add support for arbitrary DAG AM Simulator. Contributed by Abhishek Modi.
---
 hadoop-tools/hadoop-sls/pom.xml                    |   1 +
 .../java/org/apache/hadoop/yarn/sls/SLSRunner.java |  10 +-
 .../hadoop/yarn/sls/appmaster/DAGAMSimulator.java  | 285 +++++++++++++++++++++
 .../hadoop/yarn/sls/conf/SLSConfiguration.java     |   3 +
 .../yarn/sls/scheduler/ContainerSimulator.java     |  13 +-
 .../apache/hadoop/yarn/sls/TestDagAMSimulator.java |  80 ++++++
 .../hadoop/yarn/sls/TestSLSDagAMSimulator.java     |  76 ++++++
 .../hadoop/yarn/sls/appmaster/TestAMSimulator.java |   6 +-
 .../hadoop-sls/src/test/resources/sls-runner.xml   |   4 +
 .../hadoop-sls/src/test/resources/sls_dag.json     |  62 +++++
 10 files changed, 534 insertions(+), 6 deletions(-)

diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index 0df8e78..848b8c5 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -139,6 +139,7 @@
             <exclude>src/test/resources/nodes-with-resources.json</exclude>
             <exclude>src/test/resources/exit-invariants.txt</exclude>
             <exclude>src/test/resources/ongoing-invariants.txt</exclude>
+            <exclude>src/test/resources/sls_dag.json</exclude>
           </excludes>
         </configuration>
       </plugin>
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 3257915..6ed28d9 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -563,10 +563,18 @@ public class SLSRunner extends Configured implements Tool {
         allocationId = Long.parseLong(
             jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString());
       }
+
+      long requestDelay = 0;
+      if (jsonTask.containsKey(SLSConfiguration.TASK_REQUEST_DELAY)) {
+        requestDelay = Long.parseLong(
+            jsonTask.get(SLSConfiguration.TASK_REQUEST_DELAY).toString());
+      }
+      requestDelay = Math.max(requestDelay, 0);
+
       for (int i = 0; i < count; i++) {
         containers.add(
             new ContainerSimulator(res, duration, hostname, priority, type,
-                executionType, allocationId));
+                executionType, allocationId, requestDelay));
       }
     }
 
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
new file mode 100644
index 0000000..7e2d0a7
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.sls.appmaster;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AMSimulator that simulates DAG - it requests for containers
+ * based on the delay specified. It finishes when all the tasks
+ * are completed.
+ *
+ * Vocabulary Used:
+ * pending -> requests which are NOT yet sent to RM.
+ * scheduled -> requests which are sent to RM but not yet assigned.
+ * assigned -> requests which are assigned to a container.
+ * completed -> request corresponding to which container has completed.
+ * Containers are requested based on the request delay.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DAGAMSimulator extends AMSimulator {
+
+  private static final int PRIORITY = 20;
+
+  private List<ContainerSimulator> pendingContainers =
+      new LinkedList<>();
+
+  private List<ContainerSimulator> scheduledContainers =
+      new LinkedList<>();
+
+  private Map<ContainerId, ContainerSimulator> assignedContainers =
+      new HashMap<>();
+
+  private List<ContainerSimulator> completedContainers =
+      new LinkedList<>();
+
+  private List<ContainerSimulator> allContainers =
+      new LinkedList<>();
+
+  private boolean isFinished = false;
+
+  private long amStartTime;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DAGAMSimulator.class);
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  public void init(int heartbeatInterval,
+      List<ContainerSimulator> containerList, ResourceManager resourceManager,
+      SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
+      String simQueue, boolean tracked, String oldApp, long baseTimeMS,
+      Resource amResource, String nodeLabelExpr, Map<String, String> params,
+      Map<ApplicationId, AMSimulator> appIdAMSim) {
+    super.init(heartbeatInterval, containerList, resourceManager, slsRunnner,
+        startTime, finishTime, simUser, simQueue, tracked, oldApp, baseTimeMS,
+        amResource, nodeLabelExpr, params, appIdAMSim);
+    super.amtype = "dag";
+
+    allContainers.addAll(containerList);
+    pendingContainers.addAll(containerList);
+    totalContainers = allContainers.size();
+
+    LOG.info("Added new job with {} containers", allContainers.size());
+  }
+
+  @Override
+  public void firstStep() throws Exception {
+    super.firstStep();
+    amStartTime = System.currentTimeMillis();
+  }
+
+  @Override
+  public void initReservation(ReservationId reservationId,
+      long deadline, long now) {
+    // DAG AM doesn't support reservation
+    setReservationRequest(null);
+  }
+
+  @Override
+  public synchronized void notifyAMContainerLaunched(Container masterContainer)
+      throws Exception {
+    if (null != masterContainer) {
+      restart();
+      super.notifyAMContainerLaunched(masterContainer);
+    }
+  }
+
+  protected void processResponseQueue() throws Exception {
+    while (!responseQueue.isEmpty()) {
+      AllocateResponse response = responseQueue.take();
+
+      // check completed containers
+      if (!response.getCompletedContainersStatuses().isEmpty()) {
+        for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
+          ContainerId containerId = cs.getContainerId();
+          if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
+            if (assignedContainers.containsKey(containerId)) {
+              LOG.debug("Application {} has one container finished ({}).",
+                  appId, containerId);
+              ContainerSimulator containerSimulator =
+                  assignedContainers.remove(containerId);
+              finishedContainers++;
+              completedContainers.add(containerSimulator);
+            } else if (amContainer.getId().equals(containerId)) {
+              // am container released event
+              isFinished = true;
+              LOG.info("Application {} goes to finish.", appId);
+            }
+            if (finishedContainers >= totalContainers) {
+              lastStep();
+            }
+          } else {
+            // container to be killed
+            if (assignedContainers.containsKey(containerId)) {
+              LOG.error("Application {} has one container killed ({}).", appId,
+                  containerId);
+              pendingContainers.add(assignedContainers.remove(containerId));
+            } else if (amContainer.getId().equals(containerId)) {
+              LOG.error("Application {}'s AM is "
+                  + "going to be killed. Waiting for rescheduling...", appId);
+            }
+          }
+        }
+      }
+
+      // check finished
+      if (isAMContainerRunning &&
+          (finishedContainers >= totalContainers)) {
+        isAMContainerRunning = false;
+        LOG.info("Application {} sends out event to clean up"
+            + " its AM container.", appId);
+        isFinished = true;
+        break;
+      }
+
+      // check allocated containers
+      for (Container container : response.getAllocatedContainers()) {
+        if (!scheduledContainers.isEmpty()) {
+          ContainerSimulator cs = scheduledContainers.remove(0);
+          LOG.debug("Application {} starts to launch a container ({}).",
+              appId, container.getId());
+          assignedContainers.put(container.getId(), cs);
+          se.getNmMap().get(container.getNodeId())
+              .addNewContainer(container, cs.getLifeTime());
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void sendContainerRequest() throws Exception {
+    if (isFinished) {
+      return;
+    }
+    // send out request
+    List<ResourceRequest> ask = null;
+    if (finishedContainers != totalContainers) {
+      if (!pendingContainers.isEmpty()) {
+        List<ContainerSimulator> toBeScheduled =
+            getToBeScheduledContainers(pendingContainers, amStartTime);
+        if (toBeScheduled.size() > 0) {
+          ask = packageRequests(toBeScheduled, PRIORITY);
+          LOG.info("Application {} sends out request for {} containers.",
+              appId, toBeScheduled.size());
+          scheduledContainers.addAll(toBeScheduled);
+          pendingContainers.removeAll(toBeScheduled);
+          toBeScheduled.clear();
+        }
+      }
+    }
+
+    if (ask == null) {
+      ask = new ArrayList<>();
+    }
+
+    final AllocateRequest request = createAllocateRequest(ask);
+    if (totalContainers == 0) {
+      request.setProgress(1.0f);
+    } else {
+      request.setProgress((float) finishedContainers / totalContainers);
+    }
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(appAttemptId.toString());
+    Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
+        .get(appAttemptId.getApplicationId())
+        .getRMAppAttempt(appAttemptId).getAMRMToken();
+    ugi.addTokenIdentifier(token.decodeIdentifier());
+    AllocateResponse response = ugi.doAs(
+        (PrivilegedExceptionAction<AllocateResponse>) () -> rm
+            .getApplicationMasterService().allocate(request));
+    if (response != null) {
+      responseQueue.put(response);
+    }
+  }
+
+  @VisibleForTesting
+  public List<ContainerSimulator> getToBeScheduledContainers(
+      List<ContainerSimulator> containers, long startTime) {
+    List<ContainerSimulator> toBeScheduled = new LinkedList<>();
+    for (ContainerSimulator cs : containers) {
+      // only request for the container if it is time to request
+      if (cs.getRequestDelay() + startTime <=
+          System.currentTimeMillis()) {
+        toBeScheduled.add(cs);
+      }
+    }
+    return toBeScheduled;
+  }
+
+  @Override
+  protected void checkStop() {
+    if (isFinished) {
+      super.setEndTime(System.currentTimeMillis());
+    }
+  }
+
+  @Override
+  public void lastStep() throws Exception {
+    super.lastStep();
+
+    //clear data structures.
+    allContainers.clear();
+    pendingContainers.clear();
+    scheduledContainers.clear();
+    assignedContainers.clear();
+    completedContainers.clear();
+  }
+
+  /**
+   * restart running because of the am container killed.
+   */
+  private void restart() {
+    isFinished = false;
+    pendingContainers.clear();
+    pendingContainers.addAll(allContainers);
+    pendingContainers.removeAll(completedContainers);
+    amContainer = null;
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
index fc6be73..34b89b6 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
@@ -125,4 +125,7 @@ public class SLSConfiguration {
       + "execution.type";
   public static final String TASK_ALLOCATION_ID = TASK_CONTAINER
       + "allocation.id";
+  public static final String TASK_REQUEST_DELAY = TASK_CONTAINER
+      + "request.delay";
+
 }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
index 06d8162..e83ee91 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
@@ -38,6 +38,8 @@ public class ContainerSimulator implements Delayed {
   private long endTime;
   // life time (ms)
   private long lifeTime;
+  // time(ms) after which container would be requested by AM
+  private long requestDelay;
   // host name
   private String hostname;
   // priority
@@ -63,21 +65,24 @@ public class ContainerSimulator implements Delayed {
    */
   public ContainerSimulator(Resource resource, long lifeTime,
       String hostname, int priority, String type, ExecutionType executionType) {
-    this(resource, lifeTime, hostname, priority, type, executionType, -1);
+    this(resource, lifeTime, hostname, priority, type,
+        executionType, -1, 0);
   }
 
   /**
    * invoked when AM schedules containers to allocate.
    */
+  @SuppressWarnings("checkstyle:parameternumber")
   public ContainerSimulator(Resource resource, long lifeTime,
       String hostname, int priority, String type, ExecutionType executionType,
-      long allocationId) {
+      long allocationId, long requestDelay) {
     this.resource = resource;
     this.lifeTime = lifeTime;
     this.hostname = hostname;
     this.priority = priority;
     this.type = type;
     this.executionType = executionType;
+    this.requestDelay = requestDelay;
     this.allocationId = allocationId;
   }
 
@@ -148,4 +153,8 @@ public class ContainerSimulator implements Delayed {
   public long getAllocationId() {
     return allocationId;
   }
+
+  public long getRequestDelay() {
+    return requestDelay;
+  }
 }
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
new file mode 100644
index 0000000..8ac7fff
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.sls.appmaster.DAGAMSimulator;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for DagAMSimulator.
+ */
+public class TestDagAMSimulator {
+
+  /**
+   * Test to check whether containers are scheduled based on request delay.
+   * @throws Exception
+   */
+  @Test
+  public void testGetToBeScheduledContainers() throws Exception {
+    DAGAMSimulator dagamSimulator = new DAGAMSimulator();
+    List<ContainerSimulator> containerSimulators = new ArrayList<>();
+
+    // containers are requested with 0, 1000, 1500 and 4000ms delay.
+    containerSimulators.add(createContainerSim(1, 0));
+    containerSimulators.add(createContainerSim(2, 1000));
+    containerSimulators.add(createContainerSim(3, 1500));
+    containerSimulators.add(createContainerSim(4, 4000));
+
+    long startTime = System.currentTimeMillis();
+    List<ContainerSimulator> res = dagamSimulator.getToBeScheduledContainers(
+        containerSimulators, startTime);
+    // we should get only one container with request delay set to 0
+    assertEquals(1, res.size());
+    assertEquals(1, res.get(0).getAllocationId());
+
+    startTime -= 1000;
+    res = dagamSimulator.getToBeScheduledContainers(
+        containerSimulators, startTime);
+    // we should get containers with request delay set < 1000
+    assertEquals(2, res.size());
+    assertEquals(1, res.get(0).getAllocationId());
+    assertEquals(2, res.get(1).getAllocationId());
+
+    startTime -= 2000;
+    res = dagamSimulator.getToBeScheduledContainers(
+        containerSimulators, startTime);
+    // we should get containers with request delay set < 2000
+    assertEquals(3, res.size());
+    assertEquals(1, res.get(0).getAllocationId());
+    assertEquals(2, res.get(1).getAllocationId());
+    assertEquals(3, res.get(2).getAllocationId());
+  }
+
+  private ContainerSimulator createContainerSim(long allocationId,
+      long requestDelay) {
+    return new ContainerSimulator(null, 1000, "*", 1, "Map",
+        null, allocationId, requestDelay);
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSDagAMSimulator.java
new file mode 100644
index 0000000..54158c0
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSDagAMSimulator.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSDagAMSimulator extends BaseSLSRunnerTest {
+
+  @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+  public static Collection<Object[]> data() {
+
+    String capScheduler = CapacityScheduler.class.getCanonicalName();
+    String fairScheduler = FairScheduler.class.getCanonicalName();
+    String synthTraceFile = "src/test/resources/sls_dag.json";
+    String nodeFile = "src/test/resources/nodes.json";
+
+    // Test with both schedulers
+    return Arrays.asList(new Object[][] {
+
+        // covering the no nodeFile case
+        {capScheduler, "SLS", synthTraceFile, null },
+
+        // covering new commandline and CapacityScheduler
+        {capScheduler, "SLS", synthTraceFile, nodeFile },
+
+        // covering FairScheduler
+        {fairScheduler, "SLS", synthTraceFile, nodeFile },
+    });
+  }
+
+  @Before
+  public void setup() {
+    ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt";
+    exitInvariantFile = "src/test/resources/exit-invariants.txt";
+  }
+
+  @Test(timeout = 90000)
+  @SuppressWarnings("all")
+  public void testSimulatorRunning() throws Exception {
+    Configuration conf = new Configuration(false);
+    long timeTillShutdownInsec = 20L;
+    runSLS(conf, timeTillShutdownInsec);
+  }
+}
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index 8b750a5..ec7c81d 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -244,11 +244,11 @@ public class TestAMSimulator {
 
     containerSimulators.clear();
     s1 = new ContainerSimulator(resource, 100,
-        "/default-rack/h1", priority, type, execType, 1);
+        "/default-rack/h1", priority, type, execType, 1, 0);
     s2 = new ContainerSimulator(resource, 100,
-        "/default-rack/h1", priority, type, execType, 2);
+        "/default-rack/h1", priority, type, execType, 2, 0);
     s3 = new ContainerSimulator(resource, 100,
-        "/default-rack/h2", priority, type, execType, 1);
+        "/default-rack/h2", priority, type, execType, 1, 0);
 
     containerSimulators.add(s1);
     containerSimulators.add(s2);
diff --git a/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml b/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml
index 344024a..1c76aa9 100644
--- a/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml
+++ b/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml
@@ -49,6 +49,10 @@
     <name>yarn.sls.am.type.stream</name>
     <value>org.apache.hadoop.yarn.sls.appmaster.StreamAMSimulator</value>
   </property>
+  <property>
+    <name>yarn.sls.am.type.dag</name>
+    <value>org.apache.hadoop.yarn.sls.appmaster.DAGAMSimulator</value>
+  </property>
 
   <!-- Containers configuration -->
   <property>
diff --git a/hadoop-tools/hadoop-sls/src/test/resources/sls_dag.json b/hadoop-tools/hadoop-sls/src/test/resources/sls_dag.json
new file mode 100644
index 0000000..13e2adc
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/resources/sls_dag.json
@@ -0,0 +1,62 @@
+{
+  "yarn.sls.am.type": "dag",
+  "job.start.ms": 0,
+  "job.end.ms": 95375,
+  "job.queue.name": "sls_queue_1",
+  "job.id": "job_1",
+  "job.user": "default",
+  "job.tasks": [
+    {
+      "container.host": "/default-rack/node1",
+      "container.request.delay": 1000,
+      "container.start.ms": 6664,
+      "container.end.ms": 23707,
+      "container.priority": 20,
+      "container.type": "map",
+      "container.execution.type": "GUARANTEED"
+    },
+    {
+      "container.host": "/default-rack/node3",
+      "container.request.delay": 1500,
+      "container.start.ms": 6665,
+      "container.end.ms": 21593,
+      "container.priority": 20,
+      "container.type": "map",
+      "container.execution.type": "GUARANTEED"
+    },
+    {
+      "container.host": "/default-rack/node2",
+      "container.start.ms": 68770,
+      "container.end.ms": 86613,
+      "container.priority": 20,
+      "container.type": "map",
+      "container.execution.type": "GUARANTEED"
+    }
+  ]
+}
+{
+  "yarn.sls.am.type": "dag",
+  "job.start.ms": 105204,
+  "job.end.ms": 197256,
+  "job.queue.name": "sls_queue_2",
+  "job.id": "job_2",
+  "job.user": "default",
+  "job.tasks": [
+    {
+      "container.host": "/default-rack/node1",
+      "container.start.ms": 111822,
+      "container.end.ms": 133985,
+      "container.priority": 20,
+      "container.type": "map",
+      "container.execution.type": "GUARANTEED"
+    },
+    {
+      "container.host": "/default-rack/node2",
+      "container.start.ms": 111788,
+      "container.end.ms": 131377,
+      "container.priority": 20,
+      "container.type": "map",
+      "container.execution.type": "GUARANTEED"
+    }
+  ]
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org