You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ab...@apache.org on 2019/08/29 06:13:58 UTC
[hadoop] branch trunk updated: YARN-9754. Add support for arbitrary
DAG AM Simulator. Contributed by Abhishek Modi.
This is an automated email from the ASF dual-hosted git repository.
abmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 16576fd YARN-9754. Add support for arbitrary DAG AM Simulator. Contributed by Abhishek Modi.
16576fd is described below
commit 16576fde8e3697025e1d37a4c21f8ad904208f8e
Author: Abhishek Modi <ab...@apache.org>
AuthorDate: Thu Aug 29 11:43:40 2019 +0530
YARN-9754. Add support for arbitrary DAG AM Simulator. Contributed by Abhishek Modi.
---
hadoop-tools/hadoop-sls/pom.xml | 1 +
.../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 10 +-
.../hadoop/yarn/sls/appmaster/DAGAMSimulator.java | 285 +++++++++++++++++++++
.../hadoop/yarn/sls/conf/SLSConfiguration.java | 3 +
.../yarn/sls/scheduler/ContainerSimulator.java | 13 +-
.../apache/hadoop/yarn/sls/TestDagAMSimulator.java | 80 ++++++
.../hadoop/yarn/sls/TestSLSDagAMSimulator.java | 76 ++++++
.../hadoop/yarn/sls/appmaster/TestAMSimulator.java | 6 +-
.../hadoop-sls/src/test/resources/sls-runner.xml | 4 +
.../hadoop-sls/src/test/resources/sls_dag.json | 62 +++++
10 files changed, 534 insertions(+), 6 deletions(-)
diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index 0df8e78..848b8c5 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -139,6 +139,7 @@
<exclude>src/test/resources/nodes-with-resources.json</exclude>
<exclude>src/test/resources/exit-invariants.txt</exclude>
<exclude>src/test/resources/ongoing-invariants.txt</exclude>
+ <exclude>src/test/resources/sls_dag.json</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 3257915..6ed28d9 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -563,10 +563,18 @@ public class SLSRunner extends Configured implements Tool {
allocationId = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString());
}
+
+ long requestDelay = 0;
+ if (jsonTask.containsKey(SLSConfiguration.TASK_REQUEST_DELAY)) {
+ requestDelay = Long.parseLong(
+ jsonTask.get(SLSConfiguration.TASK_REQUEST_DELAY).toString());
+ }
+ requestDelay = Math.max(requestDelay, 0);
+
for (int i = 0; i < count; i++) {
containers.add(
new ContainerSimulator(res, duration, hostname, priority, type,
- executionType, allocationId));
+ executionType, allocationId, requestDelay));
}
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
new file mode 100644
index 0000000..7e2d0a7
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.sls.appmaster;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AMSimulator that simulates DAG - it requests for containers
+ * based on the delay specified. It finishes when all the tasks
+ * are completed.
+ *
+ * Vocabulary Used:
+ * pending -> requests which are NOT yet sent to RM.
+ * scheduled -> requests which are sent to RM but not yet assigned.
+ * assigned -> requests which are assigned to a container.
+ * completed -> request corresponding to which container has completed.
+ * Containers are requested based on the request delay.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DAGAMSimulator extends AMSimulator {
+
+ private static final int PRIORITY = 20;
+
+ private List<ContainerSimulator> pendingContainers =
+ new LinkedList<>();
+
+ private List<ContainerSimulator> scheduledContainers =
+ new LinkedList<>();
+
+ private Map<ContainerId, ContainerSimulator> assignedContainers =
+ new HashMap<>();
+
+ private List<ContainerSimulator> completedContainers =
+ new LinkedList<>();
+
+ private List<ContainerSimulator> allContainers =
+ new LinkedList<>();
+
+ private boolean isFinished = false;
+
+ private long amStartTime;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DAGAMSimulator.class);
+
+ @SuppressWarnings("checkstyle:parameternumber")
+ public void init(int heartbeatInterval,
+ List<ContainerSimulator> containerList, ResourceManager resourceManager,
+ SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
+ String simQueue, boolean tracked, String oldApp, long baseTimeMS,
+ Resource amResource, String nodeLabelExpr, Map<String, String> params,
+ Map<ApplicationId, AMSimulator> appIdAMSim) {
+ super.init(heartbeatInterval, containerList, resourceManager, slsRunnner,
+ startTime, finishTime, simUser, simQueue, tracked, oldApp, baseTimeMS,
+ amResource, nodeLabelExpr, params, appIdAMSim);
+ super.amtype = "dag";
+
+ allContainers.addAll(containerList);
+ pendingContainers.addAll(containerList);
+ totalContainers = allContainers.size();
+
+ LOG.info("Added new job with {} containers", allContainers.size());
+ }
+
+ @Override
+ public void firstStep() throws Exception {
+ super.firstStep();
+ amStartTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void initReservation(ReservationId reservationId,
+ long deadline, long now) {
+ // DAG AM doesn't support reservation
+ setReservationRequest(null);
+ }
+
+ @Override
+ public synchronized void notifyAMContainerLaunched(Container masterContainer)
+ throws Exception {
+ if (null != masterContainer) {
+ restart();
+ super.notifyAMContainerLaunched(masterContainer);
+ }
+ }
+
+ protected void processResponseQueue() throws Exception {
+ while (!responseQueue.isEmpty()) {
+ AllocateResponse response = responseQueue.take();
+
+ // check completed containers
+ if (!response.getCompletedContainersStatuses().isEmpty()) {
+ for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
+ ContainerId containerId = cs.getContainerId();
+ if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
+ if (assignedContainers.containsKey(containerId)) {
+ LOG.debug("Application {} has one container finished ({}).",
+ appId, containerId);
+ ContainerSimulator containerSimulator =
+ assignedContainers.remove(containerId);
+ finishedContainers++;
+ completedContainers.add(containerSimulator);
+ } else if (amContainer.getId().equals(containerId)) {
+ // am container released event
+ isFinished = true;
+ LOG.info("Application {} goes to finish.", appId);
+ }
+ if (finishedContainers >= totalContainers) {
+ lastStep();
+ }
+ } else {
+ // container to be killed
+ if (assignedContainers.containsKey(containerId)) {
+ LOG.error("Application {} has one container killed ({}).", appId,
+ containerId);
+ pendingContainers.add(assignedContainers.remove(containerId));
+ } else if (amContainer.getId().equals(containerId)) {
+ LOG.error("Application {}'s AM is "
+ + "going to be killed. Waiting for rescheduling...", appId);
+ }
+ }
+ }
+ }
+
+ // check finished
+ if (isAMContainerRunning &&
+ (finishedContainers >= totalContainers)) {
+ isAMContainerRunning = false;
+ LOG.info("Application {} sends out event to clean up"
+ + " its AM container.", appId);
+ isFinished = true;
+ break;
+ }
+
+ // check allocated containers
+ for (Container container : response.getAllocatedContainers()) {
+ if (!scheduledContainers.isEmpty()) {
+ ContainerSimulator cs = scheduledContainers.remove(0);
+ LOG.debug("Application {} starts to launch a container ({}).",
+ appId, container.getId());
+ assignedContainers.put(container.getId(), cs);
+ se.getNmMap().get(container.getNodeId())
+ .addNewContainer(container, cs.getLifeTime());
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void sendContainerRequest() throws Exception {
+ if (isFinished) {
+ return;
+ }
+ // send out request
+ List<ResourceRequest> ask = null;
+ if (finishedContainers != totalContainers) {
+ if (!pendingContainers.isEmpty()) {
+ List<ContainerSimulator> toBeScheduled =
+ getToBeScheduledContainers(pendingContainers, amStartTime);
+ if (toBeScheduled.size() > 0) {
+ ask = packageRequests(toBeScheduled, PRIORITY);
+ LOG.info("Application {} sends out request for {} containers.",
+ appId, toBeScheduled.size());
+ scheduledContainers.addAll(toBeScheduled);
+ pendingContainers.removeAll(toBeScheduled);
+ toBeScheduled.clear();
+ }
+ }
+ }
+
+ if (ask == null) {
+ ask = new ArrayList<>();
+ }
+
+ final AllocateRequest request = createAllocateRequest(ask);
+ if (totalContainers == 0) {
+ request.setProgress(1.0f);
+ } else {
+ request.setProgress((float) finishedContainers / totalContainers);
+ }
+
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
+ .get(appAttemptId.getApplicationId())
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+ AllocateResponse response = ugi.doAs(
+ (PrivilegedExceptionAction<AllocateResponse>) () -> rm
+ .getApplicationMasterService().allocate(request));
+ if (response != null) {
+ responseQueue.put(response);
+ }
+ }
+
+ @VisibleForTesting
+ public List<ContainerSimulator> getToBeScheduledContainers(
+ List<ContainerSimulator> containers, long startTime) {
+ List<ContainerSimulator> toBeScheduled = new LinkedList<>();
+ for (ContainerSimulator cs : containers) {
+ // only request for the container if it is time to request
+ if (cs.getRequestDelay() + startTime <=
+ System.currentTimeMillis()) {
+ toBeScheduled.add(cs);
+ }
+ }
+ return toBeScheduled;
+ }
+
+ @Override
+ protected void checkStop() {
+ if (isFinished) {
+ super.setEndTime(System.currentTimeMillis());
+ }
+ }
+
+ @Override
+ public void lastStep() throws Exception {
+ super.lastStep();
+
+ //clear data structures.
+ allContainers.clear();
+ pendingContainers.clear();
+ scheduledContainers.clear();
+ assignedContainers.clear();
+ completedContainers.clear();
+ }
+
+ /**
+ * restart running because of the am container killed.
+ */
+ private void restart() {
+ isFinished = false;
+ pendingContainers.clear();
+ pendingContainers.addAll(allContainers);
+ pendingContainers.removeAll(completedContainers);
+ amContainer = null;
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
index fc6be73..34b89b6 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
@@ -125,4 +125,7 @@ public class SLSConfiguration {
+ "execution.type";
public static final String TASK_ALLOCATION_ID = TASK_CONTAINER
+ "allocation.id";
+ public static final String TASK_REQUEST_DELAY = TASK_CONTAINER
+ + "request.delay";
+
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
index 06d8162..e83ee91 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
@@ -38,6 +38,8 @@ public class ContainerSimulator implements Delayed {
private long endTime;
// life time (ms)
private long lifeTime;
+ // time(ms) after which container would be requested by AM
+ private long requestDelay;
// host name
private String hostname;
// priority
@@ -63,21 +65,24 @@ public class ContainerSimulator implements Delayed {
*/
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType) {
- this(resource, lifeTime, hostname, priority, type, executionType, -1);
+ this(resource, lifeTime, hostname, priority, type,
+ executionType, -1, 0);
}
/**
* invoked when AM schedules containers to allocate.
*/
+ @SuppressWarnings("checkstyle:parameternumber")
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType,
- long allocationId) {
+ long allocationId, long requestDelay) {
this.resource = resource;
this.lifeTime = lifeTime;
this.hostname = hostname;
this.priority = priority;
this.type = type;
this.executionType = executionType;
+ this.requestDelay = requestDelay;
this.allocationId = allocationId;
}
@@ -148,4 +153,8 @@ public class ContainerSimulator implements Delayed {
public long getAllocationId() {
return allocationId;
}
+
+ public long getRequestDelay() {
+ return requestDelay;
+ }
}
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
new file mode 100644
index 0000000..8ac7fff
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.sls.appmaster.DAGAMSimulator;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for DagAMSimulator.
+ */
+public class TestDagAMSimulator {
+
+ /**
+ * Test to check whether containers are scheduled based on request delay.
+ * @throws Exception
+ */
+ @Test
+ public void testGetToBeScheduledContainers() throws Exception {
+ DAGAMSimulator dagamSimulator = new DAGAMSimulator();
+ List<ContainerSimulator> containerSimulators = new ArrayList<>();
+
+ // containers are requested with 0, 1000, 1500 and 4000ms delay.
+ containerSimulators.add(createContainerSim(1, 0));
+ containerSimulators.add(createContainerSim(2, 1000));
+ containerSimulators.add(createContainerSim(3, 1500));
+ containerSimulators.add(createContainerSim(4, 4000));
+
+ long startTime = System.currentTimeMillis();
+ List<ContainerSimulator> res = dagamSimulator.getToBeScheduledContainers(
+ containerSimulators, startTime);
+ // we should get only one container with request delay set to 0
+ assertEquals(1, res.size());
+ assertEquals(1, res.get(0).getAllocationId());
+
+ startTime -= 1000;
+ res = dagamSimulator.getToBeScheduledContainers(
+ containerSimulators, startTime);
+ // we should get containers with request delay set < 1000
+ assertEquals(2, res.size());
+ assertEquals(1, res.get(0).getAllocationId());
+ assertEquals(2, res.get(1).getAllocationId());
+
+ startTime -= 2000;
+ res = dagamSimulator.getToBeScheduledContainers(
+ containerSimulators, startTime);
+ // we should get containers with request delay set < 2000
+ assertEquals(3, res.size());
+ assertEquals(1, res.get(0).getAllocationId());
+ assertEquals(2, res.get(1).getAllocationId());
+ assertEquals(3, res.get(2).getAllocationId());
+ }
+
+ private ContainerSimulator createContainerSim(long allocationId,
+ long requestDelay) {
+ return new ContainerSimulator(null, 1000, "*", 1, "Map",
+ null, allocationId, requestDelay);
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSDagAMSimulator.java
new file mode 100644
index 0000000..54158c0
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSDagAMSimulator.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSDagAMSimulator extends BaseSLSRunnerTest {
+
+ @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+ public static Collection<Object[]> data() {
+
+ String capScheduler = CapacityScheduler.class.getCanonicalName();
+ String fairScheduler = FairScheduler.class.getCanonicalName();
+ String synthTraceFile = "src/test/resources/sls_dag.json";
+ String nodeFile = "src/test/resources/nodes.json";
+
+ // Test with both schedulers
+ return Arrays.asList(new Object[][] {
+
+ // covering the no nodeFile case
+ {capScheduler, "SLS", synthTraceFile, null },
+
+ // covering new commandline and CapacityScheduler
+ {capScheduler, "SLS", synthTraceFile, nodeFile },
+
+ // covering FairScheduler
+ {fairScheduler, "SLS", synthTraceFile, nodeFile },
+ });
+ }
+
+ @Before
+ public void setup() {
+ ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt";
+ exitInvariantFile = "src/test/resources/exit-invariants.txt";
+ }
+
+ @Test(timeout = 90000)
+ @SuppressWarnings("all")
+ public void testSimulatorRunning() throws Exception {
+ Configuration conf = new Configuration(false);
+ long timeTillShutdownInsec = 20L;
+ runSLS(conf, timeTillShutdownInsec);
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index 8b750a5..ec7c81d 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -244,11 +244,11 @@ public class TestAMSimulator {
containerSimulators.clear();
s1 = new ContainerSimulator(resource, 100,
- "/default-rack/h1", priority, type, execType, 1);
+ "/default-rack/h1", priority, type, execType, 1, 0);
s2 = new ContainerSimulator(resource, 100,
- "/default-rack/h1", priority, type, execType, 2);
+ "/default-rack/h1", priority, type, execType, 2, 0);
s3 = new ContainerSimulator(resource, 100,
- "/default-rack/h2", priority, type, execType, 1);
+ "/default-rack/h2", priority, type, execType, 1, 0);
containerSimulators.add(s1);
containerSimulators.add(s2);
diff --git a/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml b/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml
index 344024a..1c76aa9 100644
--- a/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml
+++ b/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml
@@ -49,6 +49,10 @@
<name>yarn.sls.am.type.stream</name>
<value>org.apache.hadoop.yarn.sls.appmaster.StreamAMSimulator</value>
</property>
+ <property>
+ <name>yarn.sls.am.type.dag</name>
+ <value>org.apache.hadoop.yarn.sls.appmaster.DAGAMSimulator</value>
+ </property>
<!-- Containers configuration -->
<property>
diff --git a/hadoop-tools/hadoop-sls/src/test/resources/sls_dag.json b/hadoop-tools/hadoop-sls/src/test/resources/sls_dag.json
new file mode 100644
index 0000000..13e2adc
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/resources/sls_dag.json
@@ -0,0 +1,62 @@
+{
+ "yarn.sls.am.type": "dag",
+ "job.start.ms": 0,
+ "job.end.ms": 95375,
+ "job.queue.name": "sls_queue_1",
+ "job.id": "job_1",
+ "job.user": "default",
+ "job.tasks": [
+ {
+ "container.host": "/default-rack/node1",
+ "container.request.delay": 1000,
+ "container.start.ms": 6664,
+ "container.end.ms": 23707,
+ "container.priority": 20,
+ "container.type": "map",
+ "container.execution.type": "GUARANTEED"
+ },
+ {
+ "container.host": "/default-rack/node3",
+ "container.request.delay": 1500,
+ "container.start.ms": 6665,
+ "container.end.ms": 21593,
+ "container.priority": 20,
+ "container.type": "map",
+ "container.execution.type": "GUARANTEED"
+ },
+ {
+ "container.host": "/default-rack/node2",
+ "container.start.ms": 68770,
+ "container.end.ms": 86613,
+ "container.priority": 20,
+ "container.type": "map",
+ "container.execution.type": "GUARANTEED"
+ }
+ ]
+}
+{
+ "yarn.sls.am.type": "dag",
+ "job.start.ms": 105204,
+ "job.end.ms": 197256,
+ "job.queue.name": "sls_queue_2",
+ "job.id": "job_2",
+ "job.user": "default",
+ "job.tasks": [
+ {
+ "container.host": "/default-rack/node1",
+ "container.start.ms": 111822,
+ "container.end.ms": 133985,
+ "container.priority": 20,
+ "container.type": "map",
+ "container.execution.type": "GUARANTEED"
+ },
+ {
+ "container.host": "/default-rack/node2",
+ "container.start.ms": 111788,
+ "container.end.ms": 131377,
+ "container.priority": 20,
+ "container.type": "map",
+ "container.execution.type": "GUARANTEED"
+ }
+ ]
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org