You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by qu...@apache.org on 2022/03/24 05:17:33 UTC
[hadoop] branch trunk updated: YARN-10547. Decouple job parsing logic from SLSRunner. Contributed by Szilard Nemeth.
This is an automated email from the ASF dual-hosted git repository.
quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 077c6c6 YARN-10547. Decouple job parsing logic from SLSRunner. Contributed by Szilard Nemeth.
077c6c6 is described below
commit 077c6c62d6c1ed89e209449a5f9c5849b05e7dff
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Thu Mar 24 06:16:13 2022 +0100
YARN-10547. Decouple job parsing logic from SLSRunner. Contributed by Szilard Nemeth.
---
.../org/apache/hadoop/yarn/sls/AMDefinition.java | 105 ++++++
.../hadoop/yarn/sls/AMDefinitionFactory.java | 133 ++++++++
.../apache/hadoop/yarn/sls/AMDefinitionRumen.java | 167 +++++++++
.../apache/hadoop/yarn/sls/AMDefinitionSLS.java | 186 ++++++++++
.../apache/hadoop/yarn/sls/AMDefinitionSynth.java | 146 ++++++++
.../org/apache/hadoop/yarn/sls/JobDefinition.java | 87 +++++
.../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 376 +++------------------
.../hadoop/yarn/sls/TaskContainerDefinition.java | 248 ++++++++++++++
.../hadoop/yarn/sls/appmaster/AMSimulator.java | 39 ++-
.../hadoop/yarn/sls/appmaster/DAGAMSimulator.java | 20 +-
.../hadoop/yarn/sls/appmaster/MRAMSimulator.java | 17 +-
.../yarn/sls/appmaster/StreamAMSimulator.java | 23 +-
.../yarn/sls/scheduler/ContainerSimulator.java | 37 +-
.../org/apache/hadoop/yarn/sls/utils/SLSUtils.java | 2 -
.../apache/hadoop/yarn/sls/TestDagAMSimulator.java | 16 +-
.../hadoop/yarn/sls/appmaster/TestAMSimulator.java | 95 ++++--
16 files changed, 1272 insertions(+), 425 deletions(-)
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java
new file mode 100644
index 0000000..1f9e351
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+
+import java.util.List;
+
+public abstract class AMDefinition {
+ protected int jobCount;
+ protected String amType;
+ protected String user;
+ protected String queue;
+ protected long jobStartTime;
+ protected long jobFinishTime;
+ protected List<ContainerSimulator> taskContainers;
+ protected Resource amResource;
+ protected String labelExpression;
+ protected String oldAppId;
+
+ public AMDefinition(AmDefinitionBuilder builder) {
+ this.jobStartTime = builder.jobStartTime;
+ this.jobFinishTime = builder.jobFinishTime;
+ this.amType = builder.amType;
+ this.taskContainers = builder.taskContainers;
+ this.labelExpression = builder.labelExpression;
+ this.user = builder.user;
+ this.amResource = builder.amResource;
+ this.queue = builder.queue;
+ this.jobCount = builder.jobCount;
+ this.oldAppId = builder.jobId;
+ }
+
+ public String getAmType() {
+ return amType;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getOldAppId() {
+ return oldAppId;
+ }
+
+ public long getJobStartTime() {
+ return jobStartTime;
+ }
+
+ public long getJobFinishTime() {
+ return jobFinishTime;
+ }
+
+ public List<ContainerSimulator> getTaskContainers() {
+ return taskContainers;
+ }
+
+ public Resource getAmResource() {
+ return amResource;
+ }
+
+ public String getLabelExpression() {
+ return labelExpression;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public int getJobCount() {
+ return jobCount;
+ }
+
+
+ public abstract static class AmDefinitionBuilder {
+ private static final String DEFAULT_USER = "default";
+
+ protected int jobCount = 1;
+ protected String amType = AMDefinitionFactory.DEFAULT_JOB_TYPE;
+ protected String user = DEFAULT_USER;
+ protected String queue;
+ protected String jobId;
+ protected long jobStartTime;
+ protected long jobFinishTime;
+ protected List<ContainerSimulator> taskContainers;
+ protected Resource amResource;
+ protected String labelExpression = null;
+
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
new file mode 100644
index 0000000..2bbe7bb
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public final class AMDefinitionFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AMDefinitionFactory.class);
+ public final static String DEFAULT_JOB_TYPE = "mapreduce";
+
+ private AMDefinitionFactory() {}
+
+ public static AMDefinitionSLS createFromSlsTrace(Map<?, ?> jsonJob,
+ SLSRunner slsRunner) throws YarnException {
+ AMDefinitionSLS amDefinition = AMDefinitionSLS.Builder.create(jsonJob)
+ .withAmType(SLSConfiguration.AM_TYPE)
+ .withAmResource(getAMContainerResourceSLS(jsonJob, slsRunner))
+ .withTaskContainers(
+ AMDefinitionSLS.getTaskContainers(jsonJob, slsRunner))
+ .withJobStartTime(SLSConfiguration.JOB_START_MS)
+ .withJobFinishTime(SLSConfiguration.JOB_END_MS)
+ .withLabelExpression(SLSConfiguration.JOB_LABEL_EXPR)
+ .withUser(SLSConfiguration.JOB_USER)
+ .withQueue(SLSConfiguration.JOB_QUEUE_NAME)
+ .withJobId(SLSConfiguration.JOB_ID)
+ .withJobCount(SLSConfiguration.JOB_COUNT)
+ .build();
+ slsRunner.increaseQueueAppNum(amDefinition.getQueue());
+ return amDefinition;
+ }
+
+ public static AMDefinitionRumen createFromRumenTrace(LoggedJob job,
+ long baselineTimeMs, SLSRunner slsRunner) throws YarnException {
+ AMDefinitionRumen amDefinition = AMDefinitionRumen.Builder.create()
+ .withAmType(DEFAULT_JOB_TYPE)
+ .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner))
+ .withTaskContainers(
+ AMDefinitionRumen.getTaskContainers(job, slsRunner))
+ .withJobStartTime(job.getSubmitTime())
+ .withJobFinishTime(job.getFinishTime())
+ .withBaseLineTimeMs(baselineTimeMs)
+ .withUser(job.getUser())
+ .withQueue(job.getQueue().getValue())
+ .withJobId(job.getJobID().toString())
+ .build();
+ slsRunner.increaseQueueAppNum(amDefinition.getQueue());
+ return amDefinition;
+ }
+
+ public static AMDefinitionSynth createFromSynth(SynthJob job,
+ SLSRunner slsRunner) throws YarnException {
+ AMDefinitionSynth amDefinition =
+ AMDefinitionSynth.Builder.create()
+ .withAmType(job.getType())
+ .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner))
+ .withTaskContainers(
+ AMDefinitionSynth.getTaskContainers(job, slsRunner))
+ .withUser(job.getUser())
+ .withQueue(job.getQueueName())
+ .withJobId(job.getJobID().toString())
+ .withJobStartTime(job.getSubmissionTime())
+ .withJobFinishTime(job.getSubmissionTime() + job.getDuration())
+ .withBaseLineTimeMs(0)
+ .build();
+
+ slsRunner.increaseQueueAppNum(amDefinition.getQueue());
+ return amDefinition;
+ }
+
+ private static Resource getAMContainerResourceSLS(Map<?, ?> jsonJob,
+ Configured configured) {
+ Resource amContainerResource =
+ SLSConfiguration.getAMContainerResource(configured.getConf());
+ if (jsonJob == null) {
+ return amContainerResource;
+ }
+
+ ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
+ for (ResourceInformation info : infors) {
+ String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
+ if (jsonJob.containsKey(key)) {
+ long value = Long.parseLong(jsonJob.get(key).toString());
+ amContainerResource.setResourceValue(info.getName(), value);
+ }
+ }
+
+ return amContainerResource;
+ }
+
+ private static Resource getAMContainerResourceSynthAndRumen(
+ Configured configured) {
+ return SLSConfiguration.getAMContainerResource(configured.getConf());
+ }
+
+ static void adjustTimeValuesToBaselineTime(AMDefinition amDef,
+ AMDefinition.AmDefinitionBuilder builder, long baselineTimeMs) {
+ builder.jobStartTime -= baselineTimeMs;
+ builder.jobFinishTime -= baselineTimeMs;
+ if (builder.jobStartTime < 0) {
+ LOG.warn("Warning: reset job {} start time to 0.", amDef.getOldAppId());
+ builder.jobFinishTime = builder.jobFinishTime - builder.jobStartTime;
+ builder.jobStartTime = 0;
+ }
+ amDef.jobStartTime = builder.jobStartTime;
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java
new file mode 100644
index 0000000..cc97a90
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.tools.rumen.LoggedTask;
+import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
+import org.apache.hadoop.tools.rumen.datatypes.UserName;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime;
+
+public class AMDefinitionRumen extends AMDefinition {
+ public final static int DEFAULT_MAPPER_PRIORITY = 20;
+ private final static int DEFAULT_REDUCER_PRIORITY = 10;
+
+ public AMDefinitionRumen(AmDefinitionBuilder builder) {
+ super(builder);
+ }
+
+ public static List<ContainerSimulator> getTaskContainers(LoggedJob job,
+ SLSRunner slsRunner) throws YarnException {
+ List<ContainerSimulator> containerList = new ArrayList<>();
+
+ TaskContainerDefinition.Builder builder =
+ TaskContainerDefinition.Builder.create()
+ .withCount(1)
+ .withResource(slsRunner.getDefaultContainerResource())
+ .withExecutionType(ExecutionType.GUARANTEED)
+ .withAllocationId(-1)
+ .withRequestDelay(0);
+
+ // mapper
+ for (LoggedTask mapTask : job.getMapTasks()) {
+ if (mapTask.getAttempts().size() == 0) {
+ throw new YarnException("Invalid map task, no attempt for a mapper!");
+ }
+ LoggedTaskAttempt taskAttempt =
+ mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
+ TaskContainerDefinition containerDef = builder
+ .withHostname(taskAttempt.getHostName().getValue())
+ .withDuration(taskAttempt.getFinishTime() -
+ taskAttempt.getStartTime())
+ .withPriority(DEFAULT_MAPPER_PRIORITY)
+ .withType("map")
+ .build();
+ containerList.add(
+ ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+ }
+
+ // reducer
+ for (LoggedTask reduceTask : job.getReduceTasks()) {
+ if (reduceTask.getAttempts().size() == 0) {
+ throw new YarnException(
+ "Invalid reduce task, no attempt for a reducer!");
+ }
+ LoggedTaskAttempt taskAttempt =
+ reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
+ TaskContainerDefinition containerDef = builder
+ .withHostname(taskAttempt.getHostName().getValue())
+ .withDuration(taskAttempt.getFinishTime() -
+ taskAttempt.getStartTime())
+ .withPriority(DEFAULT_REDUCER_PRIORITY)
+ .withType("reduce")
+ .build();
+ containerList.add(
+ ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+ }
+
+ return containerList;
+ }
+
+
+ public static final class Builder extends AmDefinitionBuilder {
+ private long baselineTimeMs;
+
+ private Builder() {
+ }
+
+ public static Builder create() {
+ return new Builder();
+ }
+
+ public Builder withAmType(String amType) {
+ this.amType = amType;
+ return this;
+ }
+
+ public Builder withUser(UserName user) {
+ if (user != null) {
+ this.user = user.getValue();
+ }
+ return this;
+ }
+
+ public Builder withQueue(String queue) {
+ this.queue = queue;
+ return this;
+ }
+
+ public Builder withJobId(String oldJobId) {
+ this.jobId = oldJobId;
+ return this;
+ }
+
+ public Builder withJobStartTime(long time) {
+ this.jobStartTime = time;
+ return this;
+ }
+
+ public Builder withJobFinishTime(long time) {
+ this.jobFinishTime = time;
+ return this;
+ }
+
+ public Builder withBaseLineTimeMs(long baselineTimeMs) {
+ this.baselineTimeMs = baselineTimeMs;
+ return this;
+ }
+
+ public Builder withLabelExpression(String expr) {
+ this.labelExpression = expr;
+ return this;
+ }
+
+ public AMDefinitionRumen.Builder withTaskContainers(
+ List<ContainerSimulator> taskContainers) {
+ this.taskContainers = taskContainers;
+ return this;
+ }
+
+ public AMDefinitionRumen.Builder withAmResource(Resource amResource) {
+ this.amResource = amResource;
+ return this;
+ }
+
+ public AMDefinitionRumen build() {
+ AMDefinitionRumen amDef = new AMDefinitionRumen(this);
+
+ if (baselineTimeMs == 0) {
+ baselineTimeMs = jobStartTime;
+ }
+ adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs);
+ return amDef;
+ }
+
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
new file mode 100644
index 0000000..7439ddf
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class AMDefinitionSLS extends AMDefinition {
+ public AMDefinitionSLS(AmDefinitionBuilder builder) {
+ super(builder);
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public static List<ContainerSimulator> getTaskContainers(Map<?, ?> jsonJob,
+ SLSRunner slsRunner) throws YarnException {
+ List<Map<?, ?>> tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
+ if (tasks == null || tasks.size() == 0) {
+ throw new YarnException("No task for the job!");
+ }
+
+ List<ContainerSimulator> containers = new ArrayList<>();
+ for (Map<?, ?> jsonTask : tasks) {
+ TaskContainerDefinition containerDef =
+ TaskContainerDefinition.Builder.create()
+ .withCount(jsonTask, SLSConfiguration.COUNT)
+ .withHostname((String) jsonTask.get(SLSConfiguration.TASK_HOST))
+ .withDuration(jsonTask, SLSConfiguration.TASK_DURATION_MS)
+ .withDurationLegacy(jsonTask, SLSConfiguration.DURATION_MS)
+ .withTaskStart(jsonTask, SLSConfiguration.TASK_START_MS)
+ .withTaskFinish(jsonTask, SLSConfiguration.TASK_END_MS)
+ .withResource(getResourceForContainer(jsonTask, slsRunner))
+ .withPriority(jsonTask, SLSConfiguration.TASK_PRIORITY)
+ .withType(jsonTask, SLSConfiguration.TASK_TYPE)
+ .withExecutionType(jsonTask, SLSConfiguration.TASK_EXECUTION_TYPE)
+ .withAllocationId(jsonTask, SLSConfiguration.TASK_ALLOCATION_ID)
+ .withRequestDelay(jsonTask, SLSConfiguration.TASK_REQUEST_DELAY)
+ .build();
+
+ for (int i = 0; i < containerDef.getCount(); i++) {
+ containers.add(ContainerSimulator.
+ createFromTaskContainerDefinition(containerDef));
+ }
+ }
+ return containers;
+ }
+
+ private static Resource getResourceForContainer(Map<?, ?> jsonTask,
+ SLSRunner slsRunner) {
+ Resource res = slsRunner.getDefaultContainerResource();
+ ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
+ for (ResourceInformation info : infors) {
+ if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
+ long value = Long.parseLong(
+ jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
+ .toString());
+ res.setResourceValue(info.getName(), value);
+ }
+ }
+ return res;
+ }
+
+ public static final class Builder extends AmDefinitionBuilder {
+ private final Map<?, ?> jsonJob;
+
+ private Builder(Map<?, ?> jsonJob) {
+ this.jsonJob = jsonJob;
+ }
+
+ public static Builder create(Map<?, ?> jsonJob) {
+ return new Builder(jsonJob);
+ }
+
+ public Builder withAmType(String key) {
+ if (jsonJob.containsKey(key)) {
+ String amType = (String) jsonJob.get(key);
+ if (amType != null) {
+ this.amType = amType;
+ }
+ }
+ return this;
+ }
+
+ public Builder withUser(String key) {
+ if (jsonJob.containsKey(key)) {
+ String user = (String) jsonJob.get(key);
+ if (user != null) {
+ this.user = user;
+ }
+ }
+ return this;
+ }
+
+ public Builder withQueue(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.queue = jsonJob.get(key).toString();
+ }
+ return this;
+ }
+
+ public Builder withJobId(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.jobId = (String) jsonJob.get(key);
+ }
+ return this;
+ }
+
+ public Builder withJobCount(String key) {
+ if (jsonJob.containsKey(key)) {
+ jobCount = Integer.parseInt(jsonJob.get(key).toString());
+ jobCount = Math.max(jobCount, 1);
+ }
+ return this;
+ }
+
+ public Builder withJobStartTime(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.jobStartTime = Long.parseLong(jsonJob.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withJobFinishTime(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.jobFinishTime = Long.parseLong(jsonJob.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withLabelExpression(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.labelExpression = jsonJob.get(key).toString();
+ }
+ return this;
+ }
+
+ public AMDefinitionSLS.Builder withTaskContainers(
+ List<ContainerSimulator> taskContainers) {
+ this.taskContainers = taskContainers;
+ return this;
+ }
+
+ public AMDefinitionSLS.Builder withAmResource(Resource amResource) {
+ this.amResource = amResource;
+ return this;
+ }
+
+ public AMDefinitionSLS build() {
+ AMDefinitionSLS amDef = new AMDefinitionSLS(this);
+ // Job id is generated automatically if this job configuration allows
+ // multiple job instances
+ if (jobCount > 1) {
+ amDef.oldAppId = null;
+ } else {
+ amDef.oldAppId = jobId;
+ }
+ amDef.jobCount = jobCount;
+ return amDef;
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java
new file mode 100644
index 0000000..db736f0
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+
+public class AMDefinitionSynth extends AMDefinition {
+ public AMDefinitionSynth(AmDefinitionBuilder builder) {
+ super(builder);
+ }
+
+ public static List<ContainerSimulator> getTaskContainers(
+ SynthJob job, SLSRunner slsRunner) throws YarnException {
+ List<ContainerSimulator> containerList = new ArrayList<>();
+ ArrayList<NodeId> keyAsArray = new ArrayList<>(
+ slsRunner.getNmMap().keySet());
+ Random rand = new Random(slsRunner.getStjp().getSeed());
+
+ for (SynthJob.SynthTask task : job.getTasks()) {
+ RMNode node = getRandomNode(slsRunner, keyAsArray, rand);
+ TaskContainerDefinition containerDef =
+ TaskContainerDefinition.Builder.create()
+ .withCount(1)
+ .withHostname("/" + node.getRackName() + "/" + node.getHostName())
+ .withDuration(task.getTime())
+ .withResource(Resource
+ .newInstance((int) task.getMemory(), (int) task.getVcores()))
+ .withPriority(task.getPriority())
+ .withType(task.getType())
+ .withExecutionType(task.getExecutionType())
+ .withAllocationId(-1)
+ .withRequestDelay(0)
+ .build();
+ containerList.add(
+ ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+ }
+
+ return containerList;
+ }
+
+ private static RMNode getRandomNode(SLSRunner slsRunner,
+ ArrayList<NodeId> keyAsArray, Random rand) {
+ int randomIndex = rand.nextInt(keyAsArray.size());
+ return slsRunner.getNmMap().get(keyAsArray.get(randomIndex)).getNode();
+ }
+
+ public static final class Builder extends AmDefinitionBuilder {
+ private long baselineTimeMs;
+
+ private Builder() {
+ }
+
+ public static Builder create() {
+ return new Builder();
+ }
+
+ public Builder withAmType(String amType) {
+ this.amType = amType;
+ return this;
+ }
+
+ public Builder withUser(String user) {
+ if (user != null) {
+ this.user = user;
+ }
+ return this;
+ }
+
+ public Builder withQueue(String queue) {
+ this.queue = queue;
+ return this;
+ }
+
+ public Builder withJobId(String oldJobId) {
+ this.jobId = oldJobId;
+ return this;
+ }
+
+ public Builder withJobStartTime(long time) {
+ this.jobStartTime = time;
+ return this;
+ }
+
+ public Builder withJobFinishTime(long time) {
+ this.jobFinishTime = time;
+ return this;
+ }
+
+ public Builder withBaseLineTimeMs(long baselineTimeMs) {
+ this.baselineTimeMs = baselineTimeMs;
+ return this;
+ }
+
+ public AMDefinitionSynth.Builder withLabelExpression(String expr) {
+ this.labelExpression = expr;
+ return this;
+ }
+
+ public AMDefinitionSynth.Builder withTaskContainers(
+ List<ContainerSimulator> taskContainers) {
+ this.taskContainers = taskContainers;
+ return this;
+ }
+
+ public AMDefinitionSynth.Builder withAmResource(Resource amResource) {
+ this.amResource = amResource;
+ return this;
+ }
+
+ public AMDefinitionSynth build() {
+ AMDefinitionSynth amDef = new AMDefinitionSynth(this);
+
+ if (baselineTimeMs == 0) {
+ baselineTimeMs = jobStartTime;
+ }
+ adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs);
+ return amDef;
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java
new file mode 100644
index 0000000..4a39d37
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import java.util.Map;
+
+public class JobDefinition {
+ private AMDefinition amDefinition;
+ private ReservationId reservationId;
+ private long deadline;
+ private Map<String, String> params;
+
+ public AMDefinition getAmDefinition() {
+ return amDefinition;
+ }
+
+ public ReservationId getReservationId() {
+ return reservationId;
+ }
+
+ public long getDeadline() {
+ return deadline;
+ }
+
+ //Currently unused
+ public Map<String, String> getParams() {
+ return params;
+ }
+
+ public static final class Builder {
+ private AMDefinition amDefinition;
+ private ReservationId reservationId;
+ private long deadline;
+ private Map<String, String> params;
+
+ private Builder() {
+ }
+
+ public static Builder create() {
+ return new Builder();
+ }
+
+ public Builder withAmDefinition(AMDefinition amDefinition) {
+ this.amDefinition = amDefinition;
+ return this;
+ }
+
+ public Builder withReservationId(ReservationId reservationId) {
+ this.reservationId = reservationId;
+ return this;
+ }
+
+ public Builder withDeadline(long deadline) {
+ this.deadline = deadline;
+ return this;
+ }
+
+ public Builder withParams(Map<String, String> params) {
+ this.params = params;
+ return this;
+ }
+
+ public JobDefinition build() {
+ JobDefinition jobDef = new JobDefinition();
+ jobDef.params = this.params;
+ jobDef.amDefinition = this.amDefinition;
+ jobDef.reservationId = this.reservationId;
+ jobDef.deadline = this.deadline;
+ return jobDef;
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index a8d2aa6..83834e8 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -23,12 +23,10 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.security.Security;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -59,13 +57,10 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.TableMapping;
import org.apache.hadoop.tools.rumen.JobTraceReader;
import org.apache.hadoop.tools.rumen.LoggedJob;
-import org.apache.hadoop.tools.rumen.LoggedTask;
-import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -89,7 +84,6 @@ import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
-import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
@@ -138,13 +132,8 @@ public class SLSRunner extends Configured implements Tool {
// logger
public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
- private final static int DEFAULT_MAPPER_PRIORITY = 20;
- private final static int DEFAULT_REDUCER_PRIORITY = 10;
-
private static boolean exitAtTheFinish = false;
- private static final String DEFAULT_USER = "default";
-
/**
* The type of trace in input.
*/
@@ -472,7 +461,10 @@ public class SLSRunner extends Configured implements Tool {
while (jobIter.hasNext()) {
try {
- createAMForJob(jobIter.next());
+ Map jsonJob = jobIter.next();
+ AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(
+ jsonJob, this);
+ startAMs(amDef);
} catch (Exception e) {
LOG.error("Failed to create an AM: {}", e.getMessage());
}
@@ -480,150 +472,29 @@ public class SLSRunner extends Configured implements Tool {
}
}
- private void createAMForJob(Map jsonJob) throws YarnException {
- long jobStartTime = Long.parseLong(
- jsonJob.get(SLSConfiguration.JOB_START_MS).toString());
-
- long jobFinishTime = 0;
- if (jsonJob.containsKey(SLSConfiguration.JOB_END_MS)) {
- jobFinishTime = Long.parseLong(
- jsonJob.get(SLSConfiguration.JOB_END_MS).toString());
- }
-
- String jobLabelExpr = null;
- if (jsonJob.containsKey(SLSConfiguration.JOB_LABEL_EXPR)) {
- jobLabelExpr = jsonJob.get(SLSConfiguration.JOB_LABEL_EXPR).toString();
- }
-
- String user = (String) jsonJob.get(SLSConfiguration.JOB_USER);
- if (user == null) {
- user = "default";
- }
-
- String queue = jsonJob.get(SLSConfiguration.JOB_QUEUE_NAME).toString();
- increaseQueueAppNum(queue);
-
- String amType = (String)jsonJob.get(SLSConfiguration.AM_TYPE);
- if (amType == null) {
- amType = SLSUtils.DEFAULT_JOB_TYPE;
- }
-
- int jobCount = 1;
- if (jsonJob.containsKey(SLSConfiguration.JOB_COUNT)) {
- jobCount = Integer.parseInt(
- jsonJob.get(SLSConfiguration.JOB_COUNT).toString());
- }
- jobCount = Math.max(jobCount, 1);
-
- String oldAppId = (String)jsonJob.get(SLSConfiguration.JOB_ID);
- // Job id is generated automatically if this job configuration allows
- // multiple job instances
- if(jobCount > 1) {
- oldAppId = null;
- }
-
- for (int i = 0; i < jobCount; i++) {
- runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
- getTaskContainers(jsonJob), getAMContainerResource(jsonJob),
- jobLabelExpr);
- }
- }
-
- private List<ContainerSimulator> getTaskContainers(Map jsonJob)
- throws YarnException {
- List<ContainerSimulator> containers = new ArrayList<>();
- List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
- if (tasks == null || tasks.size() == 0) {
- throw new YarnException("No task for the job!");
- }
-
- for (Object o : tasks) {
- Map jsonTask = (Map) o;
-
- String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
-
- long duration = 0;
- if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) {
- duration = Integer.parseInt(
- jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString());
- } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) {
- // Also support "duration.ms" for backward compatibility
- duration = Integer.parseInt(
- jsonTask.get(SLSConfiguration.DURATION_MS).toString());
- } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) &&
- jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) {
- long taskStart = Long.parseLong(
- jsonTask.get(SLSConfiguration.TASK_START_MS).toString());
- long taskFinish = Long.parseLong(
- jsonTask.get(SLSConfiguration.TASK_END_MS).toString());
- duration = taskFinish - taskStart;
- }
- if (duration <= 0) {
- throw new YarnException("Duration of a task shouldn't be less or equal"
- + " to 0!");
- }
-
- Resource res = getResourceForContainer(jsonTask);
-
- int priority = DEFAULT_MAPPER_PRIORITY;
- if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) {
- priority = Integer.parseInt(
- jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString());
- }
-
- String type = "map";
- if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) {
- type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString();
- }
-
- int count = 1;
- if (jsonTask.containsKey(SLSConfiguration.COUNT)) {
- count = Integer.parseInt(
- jsonTask.get(SLSConfiguration.COUNT).toString());
- }
- count = Math.max(count, 1);
-
- ExecutionType executionType = ExecutionType.GUARANTEED;
- if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) {
- executionType = ExecutionType.valueOf(
- jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString());
- }
- long allocationId = -1;
- if (jsonTask.containsKey(SLSConfiguration.TASK_ALLOCATION_ID)) {
- allocationId = Long.parseLong(
- jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString());
- }
-
- long requestDelay = 0;
- if (jsonTask.containsKey(SLSConfiguration.TASK_REQUEST_DELAY)) {
- requestDelay = Long.parseLong(
- jsonTask.get(SLSConfiguration.TASK_REQUEST_DELAY).toString());
- }
- requestDelay = Math.max(requestDelay, 0);
-
- for (int i = 0; i < count; i++) {
- containers.add(
- new ContainerSimulator(res, duration, hostname, priority, type,
- executionType, allocationId, requestDelay));
- }
+ private void startAMs(AMDefinition amDef) {
+ for (int i = 0; i < amDef.getJobCount(); i++) {
+ JobDefinition jobDef = JobDefinition.Builder.create()
+ .withAmDefinition(amDef)
+ .withDeadline(-1)
+ .withReservationId(null)
+ .withParams(null)
+ .build();
+ runNewAM(jobDef);
}
-
- return containers;
}
- private Resource getResourceForContainer(Map jsonTask) {
- Resource res = getDefaultContainerResource();
- ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
- for (ResourceInformation info : infors) {
- if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
- long value = Long.parseLong(
- jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
- .toString());
- res.setResourceValue(info.getName(), value);
- }
+ private void startAMs(AMDefinition amDef, ReservationId reservationId,
+ Map<String, String> params, long deadline) {
+ for (int i = 0; i < amDef.getJobCount(); i++) {
+ JobDefinition jobDef = JobDefinition.Builder.create()
+ .withAmDefinition(amDef)
+ .withReservationId(reservationId)
+ .withParams(params)
+ .withDeadline(deadline)
+ .build();
+ runNewAM(jobDef);
}
-
- return res;
}
/**
@@ -642,76 +513,19 @@ public class SLSRunner extends Configured implements Tool {
while (job != null) {
try {
- createAMForJob(job, baselineTimeMS);
+ AMDefinitionRumen amDef =
+ AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS,
+ this);
+ startAMs(amDef);
} catch (Exception e) {
LOG.error("Failed to create an AM", e);
}
-
job = reader.getNext();
}
}
}
- private void createAMForJob(LoggedJob job, long baselineTimeMs)
- throws YarnException {
- String user = job.getUser() == null ? "default" :
- job.getUser().getValue();
- String jobQueue = job.getQueue().getValue();
- String oldJobId = job.getJobID().toString();
- long jobStartTimeMS = job.getSubmitTime();
- long jobFinishTimeMS = job.getFinishTime();
- if (baselineTimeMs == 0) {
- baselineTimeMs = job.getSubmitTime();
- }
- jobStartTimeMS -= baselineTimeMs;
- jobFinishTimeMS -= baselineTimeMs;
- if (jobStartTimeMS < 0) {
- LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
- jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
- jobStartTimeMS = 0;
- }
-
- increaseQueueAppNum(jobQueue);
-
- List<ContainerSimulator> containerList = new ArrayList<>();
- // mapper
- for (LoggedTask mapTask : job.getMapTasks()) {
- if (mapTask.getAttempts().size() == 0) {
- throw new YarnException("Invalid map task, no attempt for a mapper!");
- }
- LoggedTaskAttempt taskAttempt =
- mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
- String hostname = taskAttempt.getHostName().getValue();
- long containerLifeTime = taskAttempt.getFinishTime() -
- taskAttempt.getStartTime();
- containerList.add(
- new ContainerSimulator(getDefaultContainerResource(),
- containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
- }
-
- // reducer
- for (LoggedTask reduceTask : job.getReduceTasks()) {
- if (reduceTask.getAttempts().size() == 0) {
- throw new YarnException(
- "Invalid reduce task, no attempt for a reducer!");
- }
- LoggedTaskAttempt taskAttempt =
- reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
- String hostname = taskAttempt.getHostName().getValue();
- long containerLifeTime = taskAttempt.getFinishTime() -
- taskAttempt.getStartTime();
- containerList.add(
- new ContainerSimulator(getDefaultContainerResource(),
- containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
- }
-
- // Only supports the default job type currently
- runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
- jobStartTimeMS, jobFinishTimeMS, containerList,
- getAMContainerResource(null));
- }
-
- private Resource getDefaultContainerResource() {
+ Resource getDefaultContainerResource() {
int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
@@ -726,94 +540,26 @@ public class SLSRunner extends Configured implements Tool {
private void startAMFromSynthGenerator() throws YarnException, IOException {
Configuration localConf = new Configuration();
localConf.set("fs.defaultFS", "file:///");
- long baselineTimeMS = 0;
-
// if we use the nodeFile this could have been not initialized yet.
if (stjp == null) {
stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
}
- SynthJob job = null;
+ SynthJob job;
// we use stjp, a reference to the job producer instantiated during node
// creation
while ((job = (SynthJob) stjp.getNextJob()) != null) {
- // only support MapReduce currently
- String user = job.getUser() == null ? DEFAULT_USER :
- job.getUser();
- String jobQueue = job.getQueueName();
- String oldJobId = job.getJobID().toString();
- long jobStartTimeMS = job.getSubmissionTime();
-
- // CARLO: Finish time is only used for logging, omit for now
- long jobFinishTimeMS = jobStartTimeMS + job.getDuration();
-
- if (baselineTimeMS == 0) {
- baselineTimeMS = jobStartTimeMS;
- }
- jobStartTimeMS -= baselineTimeMS;
- jobFinishTimeMS -= baselineTimeMS;
- if (jobStartTimeMS < 0) {
- LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
- jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
- jobStartTimeMS = 0;
- }
-
- increaseQueueAppNum(jobQueue);
-
- List<ContainerSimulator> containerList =
- new ArrayList<ContainerSimulator>();
- ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
- Random rand = new Random(stjp.getSeed());
-
- for (SynthJob.SynthTask task : job.getTasks()) {
- RMNode node = nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
- .getNode();
- String hostname = "/" + node.getRackName() + "/" + node.getHostName();
- long containerLifeTime = task.getTime();
- Resource containerResource = Resource
- .newInstance((int) task.getMemory(), (int) task.getVcores());
- containerList.add(
- new ContainerSimulator(containerResource, containerLifeTime,
- hostname, task.getPriority(), task.getType(),
- task.getExecutionType()));
- }
-
-
ReservationId reservationId = null;
-
- if(job.hasDeadline()){
+ if (job.hasDeadline()) {
reservationId = ReservationId
- .newInstance(this.rm.getStartTime(), AM_ID);
- }
-
- runNewAM(job.getType(), user, jobQueue, oldJobId,
- jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
- job.getDeadline(), getAMContainerResource(null), null,
- job.getParams());
- }
- }
-
- private Resource getAMContainerResource(Map jsonJob) {
- Resource amContainerResource =
- SLSConfiguration.getAMContainerResource(getConf());
-
- if (jsonJob == null) {
- return amContainerResource;
- }
-
- ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
- for (ResourceInformation info : infors) {
- String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
- if (jsonJob.containsKey(key)) {
- long value = Long.parseLong(jsonJob.get(key).toString());
- amContainerResource.setResourceValue(info.getName(), value);
+ .newInstance(rm.getStartTime(), AM_ID);
}
+ AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, this);
+ startAMs(amDef, reservationId, job.getParams(), job.getDeadline());
}
-
- return amContainerResource;
}
- private void increaseQueueAppNum(String queue) throws YarnException {
+ void increaseQueueAppNum(String queue) throws YarnException {
SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
String queueName = wrapper.getRealQueueName(queue);
Integer appNum = queueAppNumMap.get(queueName);
@@ -830,32 +576,16 @@ public class SLSRunner extends Configured implements Tool {
}
}
- private void runNewAM(String jobType, String user,
- String jobQueue, String oldJobId, long jobStartTimeMS,
- long jobFinishTimeMS, List<ContainerSimulator> containerList,
- Resource amContainerResource) {
- runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
- jobFinishTimeMS, containerList, null, -1,
- amContainerResource, null, null);
- }
-
- private void runNewAM(String jobType, String user,
- String jobQueue, String oldJobId, long jobStartTimeMS,
- long jobFinishTimeMS, List<ContainerSimulator> containerList,
- Resource amContainerResource, String labelExpr) {
- runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
- jobFinishTimeMS, containerList, null, -1,
- amContainerResource, labelExpr, null);
+ private AMSimulator createAmSimulator(String jobType) {
+ return (AMSimulator) ReflectionUtils.newInstance(
+ amClassMap.get(jobType), new Configuration());
}
- @SuppressWarnings("checkstyle:parameternumber")
- private void runNewAM(String jobType, String user,
- String jobQueue, String oldJobId, long jobStartTimeMS,
- long jobFinishTimeMS, List<ContainerSimulator> containerList,
- ReservationId reservationId, long deadline, Resource amContainerResource,
- String labelExpr, Map<String, String> params) {
- AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
- amClassMap.get(jobType), new Configuration());
+ private void runNewAM(JobDefinition jobDef) {
+ AMDefinition amDef = jobDef.getAmDefinition();
+ String oldJobId = amDef.getOldAppId();
+ AMSimulator amSim =
+ createAmSimulator(amDef.getAmType());
if (amSim != null) {
int heartbeatInterval = getConf().getInt(
@@ -867,19 +597,17 @@ public class SLSRunner extends Configured implements Tool {
oldJobId = Integer.toString(AM_ID);
}
AM_ID++;
- amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
- jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
- runner.getStartTimeMS(), amContainerResource, labelExpr, params,
- appIdAMSim);
- if(reservationId != null) {
+ amSim.init(amDef, rm, this, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim);
+ if (jobDef.getReservationId() != null) {
// if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific)
UTCClock clock = new UTCClock();
- amSim.initReservation(reservationId, deadline, clock.getTime());
+ amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(),
+ clock.getTime());
}
runner.schedule(amSim);
- maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
- numTasks += containerList.size();
+ maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime());
+ numTasks += amDef.getTaskContainers().size();
amMap.put(oldJobId, amSim);
}
}
@@ -1121,4 +849,12 @@ public class SLSRunner extends Configured implements Tool {
return result;
}
}
+
+ public ResourceManager getRm() {
+ return rm;
+ }
+
+ public SynthTraceJobProducer getStjp() {
+ return stjp;
+ }
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
new file mode 100644
index 0000000..1b0cd90
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.sls.AMDefinitionRumen.DEFAULT_MAPPER_PRIORITY;
+
+public class TaskContainerDefinition {
+ private long duration;
+ private Resource resource;
+ private int priority;
+ private String type;
+ private int count;
+ private ExecutionType executionType;
+ private long allocationId = -1;
+ private long requestDelay = 0;
+ private String hostname;
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public Resource getResource() {
+ return resource;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public ExecutionType getExecutionType() {
+ return executionType;
+ }
+
+ public long getAllocationId() {
+ return allocationId;
+ }
+
+ public long getRequestDelay() {
+ return requestDelay;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public static final class Builder {
+ private long duration = -1;
+ private long durationLegacy = -1;
+ private long taskStart = -1;
+ private long taskFinish = -1;
+ private Resource resource;
+ private int priority = DEFAULT_MAPPER_PRIORITY;
+ private String type = "map";
+ private int count = 1;
+ private ExecutionType executionType = ExecutionType.GUARANTEED;
+ private long allocationId = -1;
+ private long requestDelay = 0;
+ private String hostname;
+
+ public static Builder create() {
+ return new Builder();
+ }
+
+ public Builder withDuration(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ this.duration = Integer.parseInt(jsonTask.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withDuration(long duration) {
+ this.duration = duration;
+ return this;
+ }
+
+ /**
+ * Also support "duration.ms" for backward compatibility.
+ * @param jsonTask the json representation of the task.
+ * @param key The json key.
+ * @return the builder
+ */
+ public Builder withDurationLegacy(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ this.durationLegacy = Integer.parseInt(jsonTask.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withTaskStart(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ this.taskStart = Long.parseLong(jsonTask.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withTaskFinish(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ this.taskFinish = Long.parseLong(jsonTask.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withResource(Resource resource) {
+ this.resource = resource;
+ return this;
+ }
+
+ public Builder withPriority(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ this.priority = Integer.parseInt(jsonTask.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withPriority(int priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public Builder withType(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ this.type = jsonTask.get(key).toString();
+ }
+ return this;
+ }
+
+ public Builder withType(String type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder withCount(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ count = Integer.parseInt(jsonTask.get(key).toString());
+ count = Math.max(count, 1);
+ }
+ return this;
+ }
+
+ public Builder withCount(int count) {
+ this.count = count;
+ return this;
+ }
+
+ public Builder withExecutionType(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ this.executionType = ExecutionType.valueOf(
+ jsonTask.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withExecutionType(ExecutionType executionType) {
+ this.executionType = executionType;
+ return this;
+ }
+
+ public Builder withAllocationId(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ this.allocationId = Long.parseLong(jsonTask.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withAllocationId(long allocationId) {
+ this.allocationId = allocationId;
+ return this;
+ }
+
+ public Builder withRequestDelay(Map<?, ?> jsonTask, String key) {
+ if (jsonTask.containsKey(key)) {
+ requestDelay = Long.parseLong(jsonTask.get(key).toString());
+ requestDelay = Math.max(requestDelay, 0);
+ }
+ return this;
+ }
+
+ public Builder withRequestDelay(long requestDelay) {
+ this.requestDelay = requestDelay;
+ return this;
+ }
+
+ public Builder withHostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public TaskContainerDefinition build() throws YarnException {
+ TaskContainerDefinition taskContainerDef =
+ new TaskContainerDefinition();
+ taskContainerDef.duration = validateAndGetDuration(this);
+ taskContainerDef.resource = this.resource;
+ taskContainerDef.type = this.type;
+ taskContainerDef.requestDelay = this.requestDelay;
+ taskContainerDef.priority = this.priority;
+ taskContainerDef.count = this.count;
+ taskContainerDef.allocationId = this.allocationId;
+ taskContainerDef.executionType = this.executionType;
+ taskContainerDef.hostname = this.hostname;
+ return taskContainerDef;
+ }
+
+ private long validateAndGetDuration(Builder builder) throws YarnException {
+ long duration = 0;
+
+ if (builder.duration != -1) {
+ duration = builder.duration;
+ } else if (builder.durationLegacy != -1) {
+ duration = builder.durationLegacy;
+ } else if (builder.taskStart != -1 && builder.taskFinish != -1) {
+ duration = builder.taskFinish - builder.taskStart;
+ }
+
+ if (duration <= 0) {
+ throw new YarnException("Duration of a task shouldn't be less or equal"
+ + " to 0!");
+ }
+ return duration;
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 922f9a2..0a87a6c 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.AMDefinition;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
@@ -128,27 +129,25 @@ public abstract class AMSimulator extends TaskRunner.Task {
this.responseQueue = new LinkedBlockingQueue<>();
}
- @SuppressWarnings("checkstyle:parameternumber")
- public void init(int heartbeatInterval,
- List<ContainerSimulator> containerList, ResourceManager resourceManager,
- SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
- String simQueue, boolean tracked, String oldApp, long baseTimeMS,
- Resource amResource, String nodeLabelExpr, Map<String, String> params,
- Map<ApplicationId, AMSimulator> appIdAMSim) {
- super.init(startTime, startTime + 1000000L * heartbeatInterval,
- heartbeatInterval);
- this.user = simUser;
- this.rm = resourceManager;
- this.se = slsRunnner;
- this.queue = simQueue;
- this.oldAppId = oldApp;
+ public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
+ boolean tracked, long baselineTimeMS, long heartbeatInterval,
+ Map<ApplicationId, AMSimulator> appIdToAMSim) {
+ long startTime = amDef.getJobStartTime();
+ long endTime = startTime + 1000000L * heartbeatInterval;
+ super.init(startTime, endTime, heartbeatInterval);
+
+ this.user = amDef.getUser();
+ this.queue = amDef.getQueue();
+ this.oldAppId = amDef.getOldAppId();
+ this.amContainerResource = amDef.getAmResource();
+ this.nodeLabelExpression = amDef.getLabelExpression();
+ this.traceStartTimeMS = amDef.getJobStartTime();
+ this.traceFinishTimeMS = amDef.getJobFinishTime();
+ this.rm = rm;
+ this.se = slsRunner;
this.isTracked = tracked;
- this.baselineTimeMS = baseTimeMS;
- this.traceStartTimeMS = startTime;
- this.traceFinishTimeMS = finishTime;
- this.amContainerResource = amResource;
- this.nodeLabelExpression = nodeLabelExpr;
- this.appIdToAMSim = appIdAMSim;
+ this.baselineTimeMS = baselineTimeMS;
+ this.appIdToAMSim = appIdToAMSim;
}
/**
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
index 83467e0..418408d 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.AMDefinition;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.slf4j.Logger;
@@ -93,19 +93,15 @@ public class DAGAMSimulator extends AMSimulator {
LoggerFactory.getLogger(DAGAMSimulator.class);
@SuppressWarnings("checkstyle:parameternumber")
- public void init(int heartbeatInterval,
- List<ContainerSimulator> containerList, ResourceManager resourceManager,
- SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
- String simQueue, boolean tracked, String oldApp, long baseTimeMS,
- Resource amResource, String nodeLabelExpr, Map<String, String> params,
- Map<ApplicationId, AMSimulator> appIdAMSim) {
- super.init(heartbeatInterval, containerList, resourceManager, slsRunnner,
- startTime, finishTime, simUser, simQueue, tracked, oldApp, baseTimeMS,
- amResource, nodeLabelExpr, params, appIdAMSim);
+ public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
+ boolean tracked, long baselineTimeMS, long heartbeatInterval,
+ Map<ApplicationId, AMSimulator> appIdToAMSim) {
+ super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, heartbeatInterval,
+ appIdToAMSim);
super.amtype = "dag";
- allContainers.addAll(containerList);
- pendingContainers.addAll(containerList);
+ allContainers.addAll(amDef.getTaskContainers());
+ pendingContainers.addAll(amDef.getTaskContainers());
totalContainers = allContainers.size();
LOG.info("Added new job with {} containers", allContainers.size());
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 184fdca..976c022 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.AMDefinition;
import org.apache.hadoop.yarn.sls.ReservationClientUtil;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.SLSRunner;
@@ -123,19 +124,15 @@ public class MRAMSimulator extends AMSimulator {
LoggerFactory.getLogger(MRAMSimulator.class);
@SuppressWarnings("checkstyle:parameternumber")
- public void init(int heartbeatInterval,
- List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
- long traceStartTime, long traceFinishTime, String user, String queue,
- boolean isTracked, String oldAppId, long baselineStartTimeMS,
- Resource amContainerResource, String nodeLabelExpr,
- Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
- super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
- traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
- amContainerResource, nodeLabelExpr, params, appIdAMSim);
+ public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
+ boolean tracked, long baselineTimeMS, long heartbeatInterval,
+ Map<ApplicationId, AMSimulator> appIdToAMSim) {
+ super.init(amDef, rm, slsRunner, tracked, baselineTimeMS,
+ heartbeatInterval, appIdToAMSim);
amtype = "mapreduce";
// get map/reduce tasks
- for (ContainerSimulator cs : containerList) {
+ for (ContainerSimulator cs : amDef.getTaskContainers()) {
if (cs.getType().equals("map")) {
cs.setPriority(PRIORITY_MAP);
allMaps.add(cs);
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
index 7e35451..09297af 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -30,11 +30,11 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.AMDefinition;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.slf4j.Logger;
@@ -93,21 +93,14 @@ public class StreamAMSimulator extends AMSimulator {
LoggerFactory.getLogger(StreamAMSimulator.class);
@SuppressWarnings("checkstyle:parameternumber")
- public void init(int heartbeatInterval,
- List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
- long traceStartTime, long traceFinishTime, String user, String queue,
- boolean isTracked, String oldAppId, long baselineStartTimeMS,
- Resource amContainerResource, String nodeLabelExpr,
- Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
- super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
- traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
- amContainerResource, nodeLabelExpr, params, appIdAMSim);
+ public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
+ boolean tracked, long baselineTimeMS, long heartbeatInterval,
+ Map<ApplicationId, AMSimulator> appIdToAMSim) {
+ super.init(amDef, rm, slsRunner, tracked, baselineTimeMS,
+ heartbeatInterval, appIdToAMSim);
amtype = "stream";
-
- allStreams.addAll(containerList);
-
- duration = traceFinishTime - traceStartTime;
-
+ allStreams.addAll(amDef.getTaskContainers());
+ duration = amDef.getJobFinishTime() - amDef.getJobStartTime();
LOG.info("Added new job with {} streams, running for {}",
allStreams.size(), duration);
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
index e83ee91..8f11994 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
@@ -26,54 +26,41 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.sls.TaskContainerDefinition;
@Private
@Unstable
public class ContainerSimulator implements Delayed {
- // id
private ContainerId id;
- // resource allocated
private Resource resource;
- // end time
private long endTime;
// life time (ms)
private long lifeTime;
// time(ms) after which container would be requested by AM
private long requestDelay;
- // host name
private String hostname;
- // priority
private int priority;
- // type
private String type;
- // execution type
private ExecutionType executionType = ExecutionType.GUARANTEED;
- // allocation id
private long allocationId;
/**
- * invoked when AM schedules containers to allocate.
+ * Invoked when AM schedules containers to allocate.
+ * @param def The task's definition object.
+ * @return ContainerSimulator object
*/
- public ContainerSimulator(Resource resource, long lifeTime,
- String hostname, int priority, String type) {
- this(resource, lifeTime, hostname, priority, type,
- ExecutionType.GUARANTEED);
+ public static ContainerSimulator createFromTaskContainerDefinition(
+ TaskContainerDefinition def) {
+ return new ContainerSimulator(def.getResource(), def.getDuration(),
+ def.getHostname(), def.getPriority(), def.getType(),
+ def.getExecutionType(), def.getAllocationId(), def.getRequestDelay());
}
/**
- * invoked when AM schedules containers to allocate.
- */
- public ContainerSimulator(Resource resource, long lifeTime,
- String hostname, int priority, String type, ExecutionType executionType) {
- this(resource, lifeTime, hostname, priority, type,
- executionType, -1, 0);
- }
-
- /**
- * invoked when AM schedules containers to allocate.
+ * Invoked when AM schedules containers to allocate.
*/
@SuppressWarnings("checkstyle:parameternumber")
- public ContainerSimulator(Resource resource, long lifeTime,
+ private ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType,
long allocationId, long requestDelay) {
this.resource = resource;
@@ -87,7 +74,7 @@ public class ContainerSimulator implements Delayed {
}
/**
- * invoke when NM schedules containers to run.
+ * Invoked when NM schedules containers to run.
*/
public ContainerSimulator(ContainerId id, Resource resource, long endTime,
long lifeTime, long allocationId) {
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
index 256dcf4..e529d18 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
@@ -57,8 +57,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
public class SLSUtils {
- public final static String DEFAULT_JOB_TYPE = "mapreduce";
-
private static final String LABEL_FORMAT_ERR_MSG =
"Input format for adding node-labels is not correct, it should be "
+ "labelName1[(exclusive=true/false)],labelName2[] ..";
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
index 8ac7fff..e458b86 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
@@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Tests for DagAMSimulator.
@@ -74,7 +76,17 @@ public class TestDagAMSimulator {
private ContainerSimulator createContainerSim(long allocationId,
long requestDelay) {
- return new ContainerSimulator(null, 1000, "*", 1, "Map",
- null, allocationId, requestDelay);
+ TaskContainerDefinition taskContainerDef =
+ mock(TaskContainerDefinition.class);
+ when(taskContainerDef.getResource()).thenReturn(null);
+ when(taskContainerDef.getDuration()).thenReturn(1000L);
+ when(taskContainerDef.getHostname()).thenReturn("*");
+ when(taskContainerDef.getPriority()).thenReturn(1);
+ when(taskContainerDef.getType()).thenReturn("Map");
+ when(taskContainerDef.getExecutionType()).thenReturn(null);
+ when(taskContainerDef.getAllocationId()).thenReturn(allocationId);
+ when(taskContainerDef.getRequestDelay()).thenReturn(requestDelay);
+ return ContainerSimulator.createFromTaskContainerDefinition(
+ taskContainerDef);
}
}
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index 50ac700..f5db168 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.sls.appmaster;
import com.codahale.metrics.MetricRegistry;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.tools.rumen.datatypes.UserName;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -33,6 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.sls.AMDefinitionRumen;
+import org.apache.hadoop.yarn.sls.TaskContainerDefinition;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
@@ -57,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
@@ -157,9 +161,20 @@ public class TestAMSimulator {
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
- app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
- appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null,
- map);
+
+ UserName mockUser = mock(UserName.class);
+ when(mockUser.getValue()).thenReturn("user1");
+ AMDefinitionRumen amDef =
+ AMDefinitionRumen.Builder.create()
+ .withUser(mockUser)
+ .withQueue(queue)
+ .withJobId(appId)
+ .withJobStartTime(0)
+ .withJobFinishTime(1000000L)
+ .withAmResource(SLSConfiguration.getAMContainerResource(conf))
+ .withTaskContainers(containers)
+ .build();
+ app.init(amDef, rm, null, true, 0, 1000, map);
app.firstStep();
verifySchedulerMetrics(appId);
@@ -184,9 +199,21 @@ public class TestAMSimulator {
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
- app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
- appId, 0, SLSConfiguration.getAMContainerResource(conf), "label1",
- null, map);
+
+ UserName mockUser = mock(UserName.class);
+ when(mockUser.getValue()).thenReturn("user1");
+ AMDefinitionRumen amDef =
+ AMDefinitionRumen.Builder.create()
+ .withUser(mockUser)
+ .withQueue(queue)
+ .withJobId(appId)
+ .withJobStartTime(0)
+ .withJobFinishTime(1000000L)
+ .withAmResource(SLSConfiguration.getAMContainerResource(conf))
+ .withTaskContainers(containers)
+ .withLabelExpression("label1")
+ .build();
+ app.init(amDef, rm, null, true, 0, 1000, map);
app.firstStep();
verifySchedulerMetrics(appId);
@@ -201,7 +228,7 @@ public class TestAMSimulator {
}
@Test
- public void testPackageRequests() {
+ public void testPackageRequests() throws YarnException {
MockAMSimulator app = new MockAMSimulator();
List<ContainerSimulator> containerSimulators = new ArrayList<>();
Resource resource = Resources.createResource(1024);
@@ -209,12 +236,25 @@ public class TestAMSimulator {
ExecutionType execType = ExecutionType.GUARANTEED;
String type = "map";
- ContainerSimulator s1 = new ContainerSimulator(resource, 100,
- "/default-rack/h1", priority, type, execType);
- ContainerSimulator s2 = new ContainerSimulator(resource, 100,
- "/default-rack/h1", priority, type, execType);
- ContainerSimulator s3 = new ContainerSimulator(resource, 100,
- "/default-rack/h2", priority, type, execType);
+ TaskContainerDefinition.Builder builder =
+ TaskContainerDefinition.Builder.create()
+ .withResource(resource)
+ .withDuration(100)
+ .withPriority(1)
+ .withType(type)
+ .withExecutionType(execType)
+ .withAllocationId(-1)
+ .withRequestDelay(0);
+
+ ContainerSimulator s1 = ContainerSimulator
+ .createFromTaskContainerDefinition(
+ builder.withHostname("/default-rack/h1").build());
+ ContainerSimulator s2 = ContainerSimulator
+ .createFromTaskContainerDefinition(
+ builder.withHostname("/default-rack/h1").build());
+ ContainerSimulator s3 = ContainerSimulator
+ .createFromTaskContainerDefinition(
+ builder.withHostname("/default-rack/h2").build());
containerSimulators.add(s1);
containerSimulators.add(s2);
@@ -250,12 +290,15 @@ public class TestAMSimulator {
Assert.assertEquals(2, nodeRequestCount);
containerSimulators.clear();
- s1 = new ContainerSimulator(resource, 100,
- "/default-rack/h1", priority, type, execType, 1, 0);
- s2 = new ContainerSimulator(resource, 100,
- "/default-rack/h1", priority, type, execType, 2, 0);
- s3 = new ContainerSimulator(resource, 100,
- "/default-rack/h2", priority, type, execType, 1, 0);
+ s1 = ContainerSimulator.createFromTaskContainerDefinition(
+ createDefaultTaskContainerDefMock(resource, priority, execType, type,
+ "/default-rack/h1", 1));
+ s2 = ContainerSimulator.createFromTaskContainerDefinition(
+ createDefaultTaskContainerDefMock(resource, priority, execType, type,
+ "/default-rack/h1", 2));
+ s3 = ContainerSimulator.createFromTaskContainerDefinition(
+ createDefaultTaskContainerDefMock(resource, priority, execType, type,
+ "/default-rack/h2", 1));
containerSimulators.add(s1);
containerSimulators.add(s2);
@@ -317,6 +360,20 @@ public class TestAMSimulator {
Assert.assertFalse(nm.getNode().getRunningApps().contains(app.appId));
Assert.assertTrue(nm.getNode().getRunningApps().isEmpty());
}
+ private TaskContainerDefinition createDefaultTaskContainerDefMock(
+ Resource resource, int priority, ExecutionType execType, String type,
+ String hostname, long allocationId) {
+ TaskContainerDefinition taskContainerDef =
+ mock(TaskContainerDefinition.class);
+ when(taskContainerDef.getResource()).thenReturn(resource);
+ when(taskContainerDef.getDuration()).thenReturn(100L);
+ when(taskContainerDef.getPriority()).thenReturn(priority);
+ when(taskContainerDef.getType()).thenReturn(type);
+ when(taskContainerDef.getExecutionType()).thenReturn(execType);
+ when(taskContainerDef.getHostname()).thenReturn(hostname);
+ when(taskContainerDef.getAllocationId()).thenReturn(allocationId);
+ return taskContainerDef;
+ }
@After
public void tearDown() {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org