You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2018/02/21 01:14:56 UTC
[10/10] hadoop git commit: YARN-7732. Support Generic AM Simulator
from SynthGenerator. (Contributed by Young Chen via curino)
YARN-7732. Support Generic AM Simulator from SynthGenerator. (Contributed by Young Chen via curino)
(cherry picked from commit 84cea0011ffe510d24cf9f2952944f7a6fe622cf)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c6adb3d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c6adb3d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c6adb3d
Branch: refs/heads/branch-3
Commit: 5c6adb3dd34bc9da151591ecccc7b75fd371a234
Parents: ea06ad9
Author: Carlo Curino <cu...@apache.org>
Authored: Tue Feb 20 17:00:34 2018 -0800
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue Feb 20 17:04:20 2018 -0800
----------------------------------------------------------------------
hadoop-tools/hadoop-sls/pom.xml | 2 +
.../org/apache/hadoop/yarn/sls/SLSRunner.java | 137 +++---
.../hadoop/yarn/sls/appmaster/AMSimulator.java | 2 +-
.../yarn/sls/appmaster/MRAMSimulator.java | 7 +-
.../yarn/sls/appmaster/StreamAMSimulator.java | 273 +++++++++++
.../hadoop/yarn/sls/appmaster/package-info.java | 21 +
.../hadoop/yarn/sls/synthetic/SynthJob.java | 367 ++++++++------
.../yarn/sls/synthetic/SynthJobClass.java | 180 -------
.../sls/synthetic/SynthTraceJobProducer.java | 487 ++++++++++++++++---
.../yarn/sls/synthetic/SynthWorkload.java | 121 -----
.../hadoop/yarn/sls/BaseSLSRunnerTest.java | 2 +-
.../hadoop/yarn/sls/TestSLSGenericSynth.java | 76 +++
.../hadoop/yarn/sls/TestSLSStreamAMSynth.java | 76 +++
.../hadoop/yarn/sls/TestSynthJobGeneration.java | 213 +++++++-
.../yarn/sls/appmaster/TestAMSimulator.java | 2 +-
.../src/test/resources/sls-runner.xml | 4 +
.../hadoop-sls/src/test/resources/syn.json | 2 +-
.../src/test/resources/syn_generic.json | 54 ++
.../src/test/resources/syn_stream.json | 46 ++
19 files changed, 1430 insertions(+), 642 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index c123538..cf41cf4 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -133,6 +133,8 @@
<exclude>src/test/resources/simulate.info.html.template</exclude>
<exclude>src/test/resources/track.html.template</exclude>
<exclude>src/test/resources/syn.json</exclude>
+ <exclude>src/test/resources/syn_generic.json</exclude>
+ <exclude>src/test/resources/syn_stream.json</exclude>
<exclude>src/test/resources/inputsls.json</exclude>
<exclude>src/test/resources/nodes.json</exclude>
<exclude>src/test/resources/exit-invariants.txt</exclude>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 456602f..951c09d 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -47,13 +47,11 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.tools.rumen.JobTraceReader;
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.LoggedTask;
import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
-import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -627,89 +625,66 @@ public class SLSRunner extends Configured implements Tool {
localConf.set("fs.defaultFS", "file:///");
long baselineTimeMS = 0;
- try {
+ // if we use the nodeFile this could have been not initialized yet.
+ if (stjp == null) {
+ stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
+ }
- // if we use the nodeFile this could have been not initialized yet.
- if (stjp == null) {
- stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
- }
+ SynthJob job = null;
+ // we use stjp, a reference to the job producer instantiated during node
+ // creation
+ while ((job = (SynthJob) stjp.getNextJob()) != null) {
+ // only support MapReduce currently
+ String user = job.getUser();
+ String jobQueue = job.getQueueName();
+ String oldJobId = job.getJobID().toString();
+ long jobStartTimeMS = job.getSubmissionTime();
- SynthJob job = null;
- // we use stjp, a reference to the job producer instantiated during node
- // creation
- while ((job = (SynthJob) stjp.getNextJob()) != null) {
- // only support MapReduce currently
- String user = job.getUser();
- String jobQueue = job.getQueueName();
- String oldJobId = job.getJobID().toString();
- long jobStartTimeMS = job.getSubmissionTime();
-
- // CARLO: Finish time is only used for logging, omit for now
- long jobFinishTimeMS = -1L;
-
- if (baselineTimeMS == 0) {
- baselineTimeMS = jobStartTimeMS;
- }
- jobStartTimeMS -= baselineTimeMS;
- jobFinishTimeMS -= baselineTimeMS;
- if (jobStartTimeMS < 0) {
- LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
- jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
- jobStartTimeMS = 0;
- }
-
- increaseQueueAppNum(jobQueue);
-
- List<ContainerSimulator> containerList =
- new ArrayList<ContainerSimulator>();
- ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
- Random rand = new Random(stjp.getSeed());
-
- // map tasks
- for (int i = 0; i < job.getNumberMaps(); i++) {
- TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0);
- RMNode node =
- nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
- .getNode();
- String hostname = "/" + node.getRackName() + "/" + node.getHostName();
- long containerLifeTime = tai.getRuntime();
- Resource containerResource =
- Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
- (int) tai.getTaskInfo().getTaskVCores());
- containerList.add(new ContainerSimulator(containerResource,
- containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
- }
+ // CARLO: Finish time is only used for logging, omit for now
+ long jobFinishTimeMS = jobStartTimeMS + job.getDuration();
- // reduce tasks
- for (int i = 0; i < job.getNumberReduces(); i++) {
- TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0);
- RMNode node =
- nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
- .getNode();
- String hostname = "/" + node.getRackName() + "/" + node.getHostName();
- long containerLifeTime = tai.getRuntime();
- Resource containerResource =
- Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
- (int) tai.getTaskInfo().getTaskVCores());
- containerList.add(
- new ContainerSimulator(containerResource, containerLifeTime,
- hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
- }
+ if (baselineTimeMS == 0) {
+ baselineTimeMS = jobStartTimeMS;
+ }
+ jobStartTimeMS -= baselineTimeMS;
+ jobFinishTimeMS -= baselineTimeMS;
+ if (jobStartTimeMS < 0) {
+ LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
+ jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
+ jobStartTimeMS = 0;
+ }
- ReservationId reservationId = null;
+ increaseQueueAppNum(jobQueue);
+
+ List<ContainerSimulator> containerList =
+ new ArrayList<ContainerSimulator>();
+ ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
+ Random rand = new Random(stjp.getSeed());
+
+ for (SynthJob.SynthTask task : job.getTasks()) {
+ RMNode node = nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
+ .getNode();
+ String hostname = "/" + node.getRackName() + "/" + node.getHostName();
+ long containerLifeTime = task.getTime();
+ Resource containerResource = Resource
+ .newInstance((int) task.getMemory(), (int) task.getVcores());
+ containerList.add(
+ new ContainerSimulator(containerResource, containerLifeTime,
+ hostname, task.getPriority(), task.getType()));
+ }
- if (job.hasDeadline()) {
- reservationId =
- ReservationId.newInstance(this.rm.getStartTime(), AM_ID);
- }
- runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
- jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
- job.getDeadline(), getAMContainerResource(null));
+ ReservationId reservationId = null;
+ if(job.hasDeadline()){
+ reservationId = ReservationId
+ .newInstance(this.rm.getStartTime(), AM_ID);
}
- } finally {
- stjp.close();
+
+ runNewAM(job.getType(), user, jobQueue, oldJobId,
+ jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
+ job.getDeadline(), getAMContainerResource(null),
+ job.getParams());
}
}
@@ -753,14 +728,14 @@ public class SLSRunner extends Configured implements Tool {
Resource amContainerResource) {
runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
jobFinishTimeMS, containerList, null, -1,
- amContainerResource);
+ amContainerResource, null);
}
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
- ReservationId reservationId, long deadline,
- Resource amContainerResource) {
+ ReservationId reservationId, long deadline, Resource amContainerResource,
+ Map<String, String> params) {
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(jobType), new Configuration());
@@ -777,7 +752,7 @@ public class SLSRunner extends Configured implements Tool {
AM_ID++;
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
- runner.getStartTimeMS(), amContainerResource);
+ runner.getStartTimeMS(), amContainerResource, params);
if(reservationId != null) {
// if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 5727b5f..bf85fff 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -121,7 +121,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp, long baseTimeMS,
- Resource amResource) {
+ Resource amResource, Map<String, String> params) {
super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval);
this.user = simUser;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 18a155c..6f0f85f 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -65,6 +65,9 @@ public class MRAMSimulator extends AMSimulator {
scheduled when all maps have finished (not support slow-start currently).
*/
+ public static final String MAP_TYPE = "map";
+ public static final String REDUCE_TYPE = "reduce";
+
private static final int PRIORITY_REDUCE = 10;
private static final int PRIORITY_MAP = 20;
@@ -123,10 +126,10 @@ public class MRAMSimulator extends AMSimulator {
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS,
- Resource amContainerResource) {
+ Resource amContainerResource, Map<String, String> params) {
super.init(heartbeatInterval, containerList, rm, se,
traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
- baselineStartTimeMS, amContainerResource);
+ baselineStartTimeMS, amContainerResource, params);
amtype = "mapreduce";
// get map/reduce tasks
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
new file mode 100644
index 0000000..b41f5f2
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.appmaster;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AMSimulator that simulates streaming services - it keeps tasks
+ * running and resubmits them whenever they fail or complete. It finishes
+ * when the specified duration expires.
+ */
+
+@Private
+@Unstable
+public class StreamAMSimulator extends AMSimulator {
+ /*
+ Vocabulary Used:
+ pending -> requests which are NOT yet sent to RM
+ scheduled -> requests which are sent to RM but not yet assigned
+ assigned -> requests which are assigned to a container
+ completed -> request corresponding to which container has completed
+
+ streams are constantly scheduled. If a streaming job is killed, we restart it
+ */
+
+ private static final int PRIORITY_MAP = 20;
+
+ // pending streams
+ private LinkedList<ContainerSimulator> pendingStreams =
+ new LinkedList<>();
+
+ // scheduled streams
+ private LinkedList<ContainerSimulator> scheduledStreams =
+ new LinkedList<ContainerSimulator>();
+
+ // assigned streams
+ private Map<ContainerId, ContainerSimulator> assignedStreams =
+ new HashMap<ContainerId, ContainerSimulator>();
+
+ // all streams
+ private LinkedList<ContainerSimulator> allStreams =
+ new LinkedList<ContainerSimulator>();
+
+ // finished
+ private boolean isFinished = false;
+ private long duration = 0;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamAMSimulator.class);
+
+ @SuppressWarnings("checkstyle:parameternumber")
+ public void init(int heartbeatInterval,
+ List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
+ long traceStartTime, long traceFinishTime, String user, String queue,
+ boolean isTracked, String oldAppId, long baselineStartTimeMS,
+ Resource amContainerResource, Map<String, String> params) {
+ super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
+ traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
+ amContainerResource, params);
+ amtype = "stream";
+
+ allStreams.addAll(containerList);
+
+ duration = traceFinishTime - traceStartTime;
+
+ LOG.info("Added new job with {} streams, running for {}",
+ allStreams.size(), duration);
+ }
+
+ @Override
+ public synchronized void notifyAMContainerLaunched(Container masterContainer)
+ throws Exception {
+ if (null != masterContainer) {
+ restart();
+ super.notifyAMContainerLaunched(masterContainer);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void processResponseQueue() throws Exception {
+ while (!responseQueue.isEmpty()) {
+ AllocateResponse response = responseQueue.take();
+
+ // check completed containers
+ if (!response.getCompletedContainersStatuses().isEmpty()) {
+ for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
+ ContainerId containerId = cs.getContainerId();
+ if(assignedStreams.containsKey(containerId)){
+ // One of our containers completed. Regardless of reason,
+ // we want to maintain our streaming process
+ LOG.debug("Application {} has one streamer finished ({}).", appId,
+ containerId);
+ pendingStreams.add(assignedStreams.remove(containerId));
+ } else if (amContainer.getId().equals(containerId)){
+ // Our am container completed
+ if(cs.getExitStatus() == ContainerExitStatus.SUCCESS){
+ // am container released event (am container completed on success)
+ isAMContainerRunning = false;
+ isFinished = true;
+ LOG.info("Application {} goes to finish.", appId);
+ } else {
+ // am container killed - wait for re allocation
+ LOG.info("Application {}'s AM is "
+ + "going to be killed. Waiting for rescheduling...", appId);
+ isAMContainerRunning = false;
+ }
+ }
+ }
+ }
+
+ // check finished
+ if (isAMContainerRunning &&
+ (System.currentTimeMillis() - simulateStartTimeMS >= duration)) {
+ LOG.debug("Application {} sends out event to clean up"
+ + " its AM container.", appId);
+ isAMContainerRunning = false;
+ isFinished = true;
+ break;
+ }
+
+ // check allocated containers
+ for (Container container : response.getAllocatedContainers()) {
+ if (!scheduledStreams.isEmpty()) {
+ ContainerSimulator cs = scheduledStreams.remove();
+ LOG.debug("Application {} starts to launch a stream ({}).", appId,
+ container.getId());
+ assignedStreams.put(container.getId(), cs);
+ se.getNmMap().get(container.getNodeId()).addNewContainer(container,
+ cs.getLifeTime());
+ }
+ }
+ }
+ }
+
+ /**
+ * restart running because of the am container killed.
+ */
+ private void restart()
+ throws YarnException, IOException, InterruptedException {
+ // clear
+ isFinished = false;
+ pendingStreams.clear();
+ pendingStreams.addAll(allStreams);
+
+ amContainer = null;
+ }
+
+ private List<ContainerSimulator> mergeLists(List<ContainerSimulator> left,
+ List<ContainerSimulator> right) {
+ List<ContainerSimulator> list = new ArrayList<>();
+ list.addAll(left);
+ list.addAll(right);
+ return list;
+ }
+
+ @Override
+ protected void sendContainerRequest()
+ throws YarnException, IOException, InterruptedException {
+
+ // send out request
+ List<ResourceRequest> ask = new ArrayList<>();
+ List<ContainerId> release = new ArrayList<>();
+ if (!isFinished) {
+ if (!pendingStreams.isEmpty()) {
+ ask = packageRequests(mergeLists(pendingStreams, scheduledStreams),
+ PRIORITY_MAP);
+ LOG.debug("Application {} sends out request for {} streams.",
+ appId, pendingStreams.size());
+ scheduledStreams.addAll(pendingStreams);
+ pendingStreams.clear();
+ }
+ }
+
+ if(isFinished){
+ release.addAll(assignedStreams.keySet());
+ ask.clear();
+ }
+
+ final AllocateRequest request = createAllocateRequest(ask, release);
+ if (totalContainers == 0) {
+ request.setProgress(1.0f);
+ } else {
+ request.setProgress((float) finishedContainers / totalContainers);
+ }
+
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
+ .get(appAttemptId.getApplicationId())
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+ AllocateResponse response = ugi.doAs(
+ new PrivilegedExceptionAction<AllocateResponse>() {
+ @Override
+ public AllocateResponse run() throws Exception {
+ return rm.getApplicationMasterService().allocate(request);
+ }
+ });
+ if (response != null) {
+ responseQueue.put(response);
+ }
+ }
+
+ @Override
+ public void initReservation(
+ ReservationId reservationId, long deadline, long now){
+ // Streaming AM currently doesn't do reservations
+ setReservationRequest(null);
+ }
+
+ @Override
+ protected void checkStop() {
+ if (isFinished) {
+ super.setEndTime(System.currentTimeMillis());
+ }
+ }
+
+ @Override
+ public void lastStep() throws Exception {
+ super.lastStep();
+
+ // clear data structures
+ allStreams.clear();
+ assignedStreams.clear();
+ pendingStreams.clear();
+ scheduledStreams.clear();
+ responseQueue.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java
new file mode 100644
index 0000000..ead315b
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Application Master simulators for the SLS.
+ */
+package org.apache.hadoop.yarn.sls.appmaster;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
index 3ed81e1..27156c7 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
@@ -19,19 +19,25 @@ package org.apache.hadoop.yarn.sls.synthetic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math3.distribution.LogNormalDistribution;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.tools.rumen.*;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -46,6 +52,9 @@ public class SynthJob implements JobStory {
@SuppressWarnings("StaticVariableName")
private static Log LOG = LogFactory.getLog(SynthJob.class);
+ private static final long MIN_MEMORY = 1024;
+ private static final long MIN_VCORES = 1;
+
private final Configuration conf;
private final int id;
@@ -53,75 +62,93 @@ public class SynthJob implements JobStory {
private static final AtomicInteger sequence = new AtomicInteger(0);
private final String name;
private final String queueName;
- private final SynthJobClass jobClass;
+ private final SynthTraceJobProducer.JobDefinition jobDef;
+
+ private String type;
// job timing
private final long submitTime;
private final long duration;
private final long deadline;
- private final int numMapTasks;
- private final int numRedTasks;
- private final long mapMaxMemory;
- private final long reduceMaxMemory;
- private final long mapMaxVcores;
- private final long reduceMaxVcores;
- private final long[] mapRuntime;
- private final float[] reduceRuntime;
- private long totMapRuntime;
- private long totRedRuntime;
+ private Map<String, String> params;
+
+ private long totalSlotTime = 0;
+
+ // task information
+ private List<SynthTask> tasks = new ArrayList<>();
+ private Map<String, List<SynthTask>> taskByType = new HashMap<>();
+ private Map<String, Integer> taskCounts = new HashMap<>();
+ private Map<String, Long> taskMemory = new HashMap<>();
+ private Map<String, Long> taskVcores = new HashMap<>();
+
+ /**
+ * Nested class used to represent a task instance in a job. Each task
+ * corresponds to one container allocation for the job.
+ */
+ public static final class SynthTask{
+ private String type;
+ private long time;
+ private long maxMemory;
+ private long maxVcores;
+ private int priority;
+
+ private SynthTask(String type, long time, long maxMemory, long maxVcores,
+ int priority){
+ this.type = type;
+ this.time = time;
+ this.maxMemory = maxMemory;
+ this.maxVcores = maxVcores;
+ this.priority = priority;
+ }
+
+ public String getType(){
+ return type;
+ }
- public SynthJob(JDKRandomGenerator rand, Configuration conf,
- SynthJobClass jobClass, long actualSubmissionTime) {
+ public long getTime(){
+ return time;
+ }
- this.conf = conf;
- this.jobClass = jobClass;
-
- this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS);
- this.numMapTasks = jobClass.getMtasks();
- this.numRedTasks = jobClass.getRtasks();
-
- // sample memory distributions, correct for sub-minAlloc sizes
- long tempMapMaxMemory = jobClass.getMapMaxMemory();
- this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB
- ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory;
- long tempReduceMaxMemory = jobClass.getReduceMaxMemory();
- this.reduceMaxMemory =
- tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB
- ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory;
-
- // sample vcores distributions, correct for sub-minAlloc sizes
- long tempMapMaxVCores = jobClass.getMapMaxVcores();
- this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES
- ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores;
- long tempReduceMaxVcores = jobClass.getReduceMaxVcores();
- this.reduceMaxVcores =
- tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES
- ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores;
-
- if (numMapTasks > 0) {
- conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory);
- conf.set(MRJobConfig.MAP_JAVA_OPTS,
- "-Xmx" + (this.mapMaxMemory - 100) + "m");
+ public long getMemory(){
+ return maxMemory;
}
- if (numRedTasks > 0) {
- conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory);
- conf.set(MRJobConfig.REDUCE_JAVA_OPTS,
- "-Xmx" + (this.reduceMaxMemory - 100) + "m");
+ public long getVcores(){
+ return maxVcores;
}
- boolean hasDeadline =
- (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation);
+ public int getPriority(){
+ return priority;
+ }
+
+ @Override
+ public String toString(){
+ return String.format("[task]\ttype: %1$-10s\ttime: %2$3s\tmemory: "
+ + "%3$4s\tvcores: %4$2s%n", getType(), getTime(), getMemory(),
+ getVcores());
+ }
+ }
- LogNormalDistribution deadlineFactor =
- SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg,
- jobClass.jobClass.deadline_factor_stddev);
- double deadlineFactorSample =
- (deadlineFactor != null) ? deadlineFactor.sample() : -1;
+ protected SynthJob(JDKRandomGenerator rand, Configuration conf,
+ SynthTraceJobProducer.JobDefinition jobDef,
+ String queue, long actualSubmissionTime) {
- this.queueName = jobClass.workload.getQueueName();
+ this.conf = conf;
+ this.jobDef = jobDef;
+
+ this.queueName = queue;
+
+ this.duration = MILLISECONDS.convert(jobDef.duration.getInt(),
+ SECONDS);
+
+ boolean hasDeadline =
+ (rand.nextDouble() <= jobDef.reservation.getDouble());
+
+ double deadlineFactorSample = jobDef.deadline_factor.getDouble();
+
+ this.type = jobDef.type;
this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS);
@@ -129,6 +156,8 @@ public class SynthJob implements JobStory {
hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS)
+ (long) Math.ceil(deadlineFactorSample * duration) : -1;
+ this.params = jobDef.params;
+
conf.set(QUEUE_NAME, queueName);
// name and initialize job randomness
@@ -136,79 +165,166 @@ public class SynthJob implements JobStory {
rand.setSeed(seed);
id = sequence.getAndIncrement();
- name = String.format(jobClass.getClassName() + "_%06d", id);
+ name = String.format(jobDef.class_name + "_%06d", id);
LOG.debug(name + " (" + seed + ")");
LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime
+ " deadline:" + deadline + " duration:" + duration
+ " deadline-submission: " + (deadline - submitTime));
- // generate map and reduce runtimes
- mapRuntime = new long[numMapTasks];
- for (int i = 0; i < numMapTasks; i++) {
- mapRuntime[i] = jobClass.getMapTimeSample();
- totMapRuntime += mapRuntime[i];
- }
- reduceRuntime = new float[numRedTasks];
- for (int i = 0; i < numRedTasks; i++) {
- reduceRuntime[i] = jobClass.getReduceTimeSample();
- totRedRuntime += (long) Math.ceil(reduceRuntime[i]);
+ // Expand tasks
+ for(SynthTraceJobProducer.TaskDefinition task : jobDef.tasks){
+ int num = task.count.getInt();
+ String taskType = task.type;
+ long memory = task.max_memory.getLong();
+ memory = memory < MIN_MEMORY ? MIN_MEMORY: memory;
+ long vcores = task.max_vcores.getLong();
+ vcores = vcores < MIN_VCORES ? MIN_VCORES : vcores;
+ int priority = task.priority;
+
+ // Save task information by type
+ taskByType.put(taskType, new ArrayList<>());
+ taskCounts.put(taskType, num);
+ taskMemory.put(taskType, memory);
+ taskVcores.put(taskType, vcores);
+
+ for(int i = 0; i < num; ++i){
+ long time = task.time.getLong();
+ totalSlotTime += time;
+ SynthTask t = new SynthTask(taskType, time, memory, vcores,
+ priority);
+ tasks.add(t);
+ taskByType.get(taskType).add(t);
+ }
}
+
+ }
+
+ public String getType(){
+ return type;
+ }
+
+ public List<SynthTask> getTasks(){
+ return tasks;
}
public boolean hasDeadline() {
return deadline > 0;
}
- @Override
public String getName() {
return name;
}
- @Override
public String getUser() {
- return jobClass.getUserName();
+ return jobDef.user_name;
}
- @Override
public JobID getJobID() {
return new JobID("job_mock_" + name, id);
}
+ public long getSubmissionTime() {
+ return submitTime;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
@Override
- public Values getOutcome() {
- return Values.SUCCESS;
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ String res = "\nSynthJob [" + jobDef.class_name + "]: \n"
+ + "\tname: " + getName() + "\n"
+ + "\ttype: " + getType() + "\n"
+ + "\tid: " + id + "\n"
+ + "\tqueue: " + getQueueName() + "\n"
+ + "\tsubmission: " + getSubmissionTime() + "\n"
+ + "\tduration: " + getDuration() + "\n"
+ + "\tdeadline: " + getDeadline() + "\n";
+ sb.append(res);
+ int taskno = 0;
+ for(SynthJob.SynthTask t : getTasks()){
+ sb.append("\t");
+ sb.append(taskno);
+ sb.append(": \t");
+ sb.append(t.toString());
+ taskno++;
+ }
+ return sb.toString();
+ }
+
+ public long getTotalSlotTime() {
+ return totalSlotTime;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public long getDeadline() {
+ return deadline;
+ }
+
+ public Map<String, String> getParams() {
+ return params;
}
@Override
- public long getSubmissionTime() {
- return submitTime;
+ public boolean equals(Object other) {
+ if (!(other instanceof SynthJob)) {
+ return false;
+ }
+ SynthJob o = (SynthJob) other;
+ return tasks.equals(o.tasks)
+ && submitTime == o.submitTime
+ && type.equals(o.type)
+ && queueName.equals(o.queueName)
+ && jobDef.class_name.equals(o.jobDef.class_name);
+ }
+
+ @Override
+ public int hashCode() {
+ return jobDef.class_name.hashCode()
+ * (int) submitTime * (int) duration;
+ }
+
+
+ @Override
+ public JobConf getJobConf() {
+ return new JobConf(conf);
}
@Override
public int getNumberMaps() {
- return numMapTasks;
+ return taskCounts.get(MRAMSimulator.MAP_TYPE);
}
@Override
public int getNumberReduces() {
- return numRedTasks;
+ return taskCounts.get(MRAMSimulator.REDUCE_TYPE);
+ }
+
+ @Override
+ public InputSplit[] getInputSplits() {
+ throw new UnsupportedOperationException();
}
@Override
public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
- switch (taskType) {
+ switch(taskType){
case MAP:
- return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores);
+ return new TaskInfo(-1, -1, -1, -1,
+ taskMemory.get(MRAMSimulator.MAP_TYPE),
+ taskVcores.get(MRAMSimulator.MAP_TYPE));
case REDUCE:
- return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores);
+ return new TaskInfo(-1, -1, -1, -1,
+ taskMemory.get(MRAMSimulator.REDUCE_TYPE),
+ taskVcores.get(MRAMSimulator.REDUCE_TYPE));
default:
- throw new IllegalArgumentException("Not interested");
+ break;
}
- }
-
- @Override
- public InputSplit[] getInputSplits() {
throw new UnsupportedOperationException();
}
@@ -218,17 +334,20 @@ public class SynthJob implements JobStory {
switch (taskType) {
case MAP:
return new MapTaskAttemptInfo(State.SUCCEEDED,
- getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null);
-
+ getTaskInfo(taskType, taskNumber),
+ taskByType.get(MRAMSimulator.MAP_TYPE).get(taskNumber).time,
+ null);
case REDUCE:
// We assume uniform split between pull/sort/reduce
// aligned with naive progress reporting assumptions
return new ReduceTaskAttemptInfo(State.SUCCEEDED,
getTaskInfo(taskType, taskNumber),
- (long) Math.round((reduceRuntime[taskNumber] / 3)),
- (long) Math.round((reduceRuntime[taskNumber] / 3)),
- (long) Math.round((reduceRuntime[taskNumber] / 3)), null);
-
+ taskByType.get(MRAMSimulator.MAP_TYPE)
+ .get(taskNumber).time / 3,
+ taskByType.get(MRAMSimulator.MAP_TYPE)
+ .get(taskNumber).time / 3,
+ taskByType.get(MRAMSimulator.MAP_TYPE)
+ .get(taskNumber).time / 3, null);
default:
break;
}
@@ -242,65 +361,7 @@ public class SynthJob implements JobStory {
}
@Override
- public org.apache.hadoop.mapred.JobConf getJobConf() {
- return new JobConf(conf);
- }
-
- @Override
- public String getQueueName() {
- return queueName;
- }
-
- @Override
- public String toString() {
- return "SynthJob [\n" + " workload=" + jobClass.getWorkload().getId()
- + "\n" + " jobClass="
- + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n"
- + " conf=" + conf + ",\n" + " id=" + id + ",\n" + " name=" + name
- + ",\n" + " mapRuntime=" + Arrays.toString(mapRuntime) + ",\n"
- + " reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n"
- + " submitTime=" + submitTime + ",\n" + " numMapTasks=" + numMapTasks
- + ",\n" + " numRedTasks=" + numRedTasks + ",\n" + " mapMaxMemory="
- + mapMaxMemory + ",\n" + " reduceMaxMemory=" + reduceMaxMemory + ",\n"
- + " queueName=" + queueName + "\n" + "]";
- }
-
- public SynthJobClass getJobClass() {
- return jobClass;
- }
-
- public long getTotalSlotTime() {
- return totMapRuntime + totRedRuntime;
- }
-
- public long getDuration() {
- return duration;
- }
-
- public long getDeadline() {
- return deadline;
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof SynthJob)) {
- return false;
- }
- SynthJob o = (SynthJob) other;
- return Arrays.equals(mapRuntime, o.mapRuntime)
- && Arrays.equals(reduceRuntime, o.reduceRuntime)
- && submitTime == o.submitTime && numMapTasks == o.numMapTasks
- && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory
- && reduceMaxMemory == o.reduceMaxMemory
- && mapMaxVcores == o.mapMaxVcores
- && reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName)
- && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime
- && totRedRuntime == o.totRedRuntime;
- }
-
- @Override
- public int hashCode() {
- // could have a bad distr; investigate if a relevant use case exists
- return jobClass.hashCode() * (int) submitTime;
+ public Values getOutcome() {
+ return Values.SUCCESS;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
deleted file mode 100644
index 439698f..0000000
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.sls.synthetic;
-
-import org.apache.commons.math3.distribution.AbstractRealDistribution;
-import org.apache.commons.math3.distribution.LogNormalDistribution;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.tools.rumen.JobStory;
-import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass;
-import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
-
-/**
- * This is a class that represent a class of Jobs. It is used to generate an
- * individual job, by picking random durations, task counts, container size,
- * etc.
- */
-public class SynthJobClass {
-
- private final JDKRandomGenerator rand;
- private final LogNormalDistribution dur;
- private final LogNormalDistribution mapRuntime;
- private final LogNormalDistribution redRuntime;
- private final LogNormalDistribution mtasks;
- private final LogNormalDistribution rtasks;
- private final LogNormalDistribution mapMem;
- private final LogNormalDistribution redMem;
- private final LogNormalDistribution mapVcores;
- private final LogNormalDistribution redVcores;
-
- private final Trace trace;
- @SuppressWarnings("VisibilityModifier")
- protected final SynthWorkload workload;
- @SuppressWarnings("VisibilityModifier")
- protected final JobClass jobClass;
-
- public SynthJobClass(JDKRandomGenerator rand, Trace trace,
- SynthWorkload workload, int classId) {
-
- this.trace = trace;
- this.workload = workload;
- this.rand = new JDKRandomGenerator();
- this.rand.setSeed(rand.nextLong());
- jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId);
-
- this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg,
- jobClass.dur_stddev);
- this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg,
- jobClass.mtime_stddev);
- this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg,
- jobClass.rtime_stddev);
- this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg,
- jobClass.mtasks_stddev);
- this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg,
- jobClass.rtasks_stddev);
-
- this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg,
- jobClass.map_max_memory_stddev);
- this.redMem = SynthUtils.getLogNormalDist(rand,
- jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev);
- this.mapVcores = SynthUtils.getLogNormalDist(rand,
- jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev);
- this.redVcores = SynthUtils.getLogNormalDist(rand,
- jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev);
- }
-
- public JobStory getJobStory(Configuration conf, long actualSubmissionTime) {
- return new SynthJob(rand, conf, this, actualSubmissionTime);
- }
-
- @Override
- public String toString() {
- return "SynthJobClass [workload=" + workload.getName() + ", class="
- + jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur="
- + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime="
- + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0)
- + ", redRuntime="
- + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0)
- + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0)
- + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0)
- + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n";
-
- }
-
- public double getClassWeight() {
- return jobClass.class_weight;
- }
-
- public long getDur() {
- return genLongSample(dur);
- }
-
- public int getMtasks() {
- return genIntSample(mtasks);
- }
-
- public int getRtasks() {
- return genIntSample(rtasks);
- }
-
- public long getMapMaxMemory() {
- return genLongSample(mapMem);
- }
-
- public long getReduceMaxMemory() {
- return genLongSample(redMem);
- }
-
- public long getMapMaxVcores() {
- return genLongSample(mapVcores);
- }
-
- public long getReduceMaxVcores() {
- return genLongSample(redVcores);
- }
-
- public SynthWorkload getWorkload() {
- return workload;
- }
-
- public int genIntSample(AbstractRealDistribution dist) {
- if (dist == null) {
- return 0;
- }
- double baseSample = dist.sample();
- if (baseSample < 0) {
- baseSample = 0;
- }
- return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample));
- }
-
- public long genLongSample(AbstractRealDistribution dist) {
- return dist != null ? (long) Math.ceil(dist.sample()) : 0;
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof SynthJobClass)) {
- return false;
- }
- SynthJobClass o = (SynthJobClass) other;
- return workload.equals(o.workload);
- }
-
- @Override
- public int hashCode() {
- return workload.hashCode() * workload.getId();
- }
-
- public String getClassName() {
- return jobClass.class_name;
- }
-
- public long getMapTimeSample() {
- return genLongSample(mapRuntime);
- }
-
- public long getReduceTimeSample() {
- return genLongSample(redRuntime);
- }
-
- public String getUserName() {
- return jobClass.user_name;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
index c89e4e2..09bc9b9 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.sls.synthetic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -26,7 +27,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
+import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import javax.xml.bind.annotation.XmlRootElement;
@@ -39,7 +44,7 @@ import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNK
/**
* This is a JobStoryProducer that operates from distribution of different
- * workloads. The .json input file is used to determine how many jobs, which
+ * workloads. The .json input file is used to determine how many weight, which
* size, number of maps/reducers and their duration, as well as the temporal
* distributed of submissions. For each parameter we control avg and stdev, and
* generate values via normal or log-normal distributions.
@@ -55,8 +60,6 @@ public class SynthTraceJobProducer implements JobStoryProducer {
private final long seed;
private int totalWeight;
- private final List<Double> weightList;
- private final Map<Integer, SynthWorkload> workloads;
private final Queue<StoryParams> listStoryParams;
@@ -65,6 +68,9 @@ public class SynthTraceJobProducer implements JobStoryProducer {
public static final String SLS_SYNTHETIC_TRACE_FILE =
"sls.synthetic" + ".trace_file";
+ private final static int DEFAULT_MAPPER_PRIORITY = 20;
+ private final static int DEFAULT_REDUCER_PRIORITY = 10;
+
public SynthTraceJobProducer(Configuration conf) throws IOException {
this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE)));
}
@@ -76,8 +82,6 @@ public class SynthTraceJobProducer implements JobStoryProducer {
this.conf = conf;
this.rand = new JDKRandomGenerator();
- workloads = new HashMap<Integer, SynthWorkload>();
- weightList = new ArrayList<Double>();
ObjectMapper mapper = new ObjectMapper();
mapper.configure(INTERN_FIELD_NAMES, true);
@@ -86,44 +90,132 @@ public class SynthTraceJobProducer implements JobStoryProducer {
FileSystem ifs = path.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(path);
+ // Initialize the random generator and the seed
this.trace = mapper.readValue(fileIn, Trace.class);
- seed = trace.rand_seed;
- rand.setSeed(seed);
+ this.seed = trace.rand_seed;
+ this.rand.setSeed(seed);
+ // Initialize the trace
+ this.trace.init(rand);
this.numJobs = new AtomicInteger(trace.num_jobs);
- for (int workloadId = 0; workloadId < trace.workloads
- .size(); workloadId++) {
- SynthWorkload workload = new SynthWorkload(workloadId, trace);
- for (int classId =
- 0; classId < trace.workloads.get(workloadId).job_classes
- .size(); classId++) {
- SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId);
- workload.add(cls);
- }
- workloads.put(workloadId, workload);
+ for (Double w : trace.workload_weights) {
+ totalWeight += w;
}
- for (int i = 0; i < workloads.size(); i++) {
- double w = workloads.get(i).getWorkloadWeight();
- totalWeight += w;
- weightList.add(w);
+ // Initialize our story parameters
+ listStoryParams = createStory();
+
+ LOG.info("Generated " + listStoryParams.size() + " deadlines for "
+ + this.numJobs.get() + " jobs");
+ }
+
+ // StoryParams hold the minimum amount of information needed to completely
+ // specify a job run: job definition, start time, and queue.
+ // This allows us to create "jobs" and then order them according to start time
+ static class StoryParams {
+ // Time the job gets submitted to
+ private long actualSubmissionTime;
+ // The queue the job gets submitted to
+ private String queue;
+ // Definition to construct the job from
+ private JobDefinition jobDef;
+
+ StoryParams(long actualSubmissionTime, String queue, JobDefinition jobDef) {
+ this.actualSubmissionTime = actualSubmissionTime;
+ this.queue = queue;
+ this.jobDef = jobDef;
}
+ }
+
+ private Queue<StoryParams> createStory() {
// create priority queue to keep start-time sorted
- listStoryParams =
- new PriorityQueue<StoryParams>(10, new Comparator<StoryParams>() {
+ Queue<StoryParams> storyQueue =
+ new PriorityQueue<>(this.numJobs.get(), new Comparator<StoryParams>() {
@Override
public int compare(StoryParams o1, StoryParams o2) {
return Math
- .toIntExact(o2.actualSubmissionTime - o1.actualSubmissionTime);
+ .toIntExact(o1.actualSubmissionTime - o2.actualSubmissionTime);
}
});
+ for (int i = 0; i < numJobs.get(); i++) {
+ // Generate a workload
+ Workload wl = trace.generateWorkload();
+ // Save all the parameters needed to completely define a job
+ long actualSubmissionTime = wl.generateSubmissionTime();
+ String queue = wl.queue_name;
+ JobDefinition job = wl.generateJobDefinition();
+ storyQueue.add(new StoryParams(actualSubmissionTime, queue, job));
+ }
+ return storyQueue;
+ }
- // initialize it
- createStoryParams();
- LOG.info("Generated " + listStoryParams.size() + " deadlines for "
- + this.numJobs.get() + " jobs ");
+ @Override
+ public JobStory getNextJob() throws IOException {
+ if (numJobs.decrementAndGet() < 0) {
+ return null;
+ }
+ StoryParams storyParams = listStoryParams.poll();
+ return new SynthJob(rand, conf, storyParams.jobDef, storyParams.queue,
+ storyParams.actualSubmissionTime);
+ }
+
+ @Override
+ public void close(){
+ }
+
+ @Override
+ public String toString() {
+ return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs
+ + ", r=" + rand + ", totalWeight="
+ + totalWeight + ", workloads=" + trace.workloads + "]";
+ }
+
+ public int getNumJobs() {
+ return trace.num_jobs;
+ }
+
+ // Helper to parse and maintain backwards compatibility with
+ // syn json formats
+ private static void validateJobDef(JobDefinition jobDef){
+ if(jobDef.tasks == null) {
+ LOG.info("Detected old JobDefinition format. Converting.");
+ try {
+ jobDef.tasks = new ArrayList<>();
+ jobDef.type = "mapreduce";
+ jobDef.deadline_factor = new Sample(jobDef.deadline_factor_avg,
+ jobDef.deadline_factor_stddev);
+ jobDef.duration = new Sample(jobDef.dur_avg,
+ jobDef.dur_stddev);
+ jobDef.reservation = new Sample(jobDef.chance_of_reservation);
+
+ TaskDefinition map = new TaskDefinition();
+ map.type = MRAMSimulator.MAP_TYPE;
+ map.count = new Sample(jobDef.mtasks_avg, jobDef.mtasks_stddev);
+ map.time = new Sample(jobDef.mtime_avg, jobDef.mtime_stddev);
+ map.max_memory = new Sample((double) jobDef.map_max_memory_avg,
+ jobDef.map_max_memory_stddev);
+ map.max_vcores = new Sample((double) jobDef.map_max_vcores_avg,
+ jobDef.map_max_vcores_stddev);
+ map.priority = DEFAULT_MAPPER_PRIORITY;
+
+ jobDef.tasks.add(map);
+ TaskDefinition reduce = new TaskDefinition();
+ reduce.type = MRAMSimulator.REDUCE_TYPE;
+ reduce.count = new Sample(jobDef.rtasks_avg, jobDef.rtasks_stddev);
+ reduce.time = new Sample(jobDef.rtime_avg, jobDef.rtime_stddev);
+ reduce.max_memory = new Sample((double) jobDef.reduce_max_memory_avg,
+ jobDef.reduce_max_memory_stddev);
+ reduce.max_vcores = new Sample((double) jobDef.reduce_max_vcores_avg,
+ jobDef.reduce_max_vcores_stddev);
+ reduce.priority = DEFAULT_REDUCER_PRIORITY;
+
+ jobDef.tasks.add(reduce);
+ } catch (JsonMappingException e) {
+ LOG.warn("Error converting old JobDefinition format", e);
+ }
+ }
}
public long getSeed() {
@@ -159,6 +251,25 @@ public class SynthTraceJobProducer implements JobStoryProducer {
@JsonProperty("workloads")
List<Workload> workloads;
+ List<Double> workload_weights;
+ JDKRandomGenerator rand;
+
+ public void init(JDKRandomGenerator random){
+ this.rand = random;
+ // Pass rand forward
+ for(Workload w : workloads){
+ w.init(rand);
+ }
+ // Initialize workload weights
+ workload_weights = new ArrayList<>();
+ for(Workload w : workloads){
+ workload_weights.add(w.workload_weight);
+ }
+ }
+
+ Workload generateWorkload(){
+ return workloads.get(SynthUtils.getWeighted(workload_weights, rand));
+ }
}
/**
@@ -174,16 +285,67 @@ public class SynthTraceJobProducer implements JobStoryProducer {
@JsonProperty("queue_name")
String queue_name;
@JsonProperty("job_classes")
- List<JobClass> job_classes;
+ List<JobDefinition> job_classes;
@JsonProperty("time_distribution")
List<TimeSample> time_distribution;
+
+ JDKRandomGenerator rand;
+
+ List<Double> job_weights;
+ List<Double> time_weights;
+
+ public void init(JDKRandomGenerator random){
+ this.rand = random;
+ // Validate and pass rand forward
+ for(JobDefinition def : job_classes){
+ validateJobDef(def);
+ def.init(rand);
+ }
+
+ // Initialize job weights
+ job_weights = new ArrayList<>();
+ job_weights = new ArrayList<>();
+ for(JobDefinition j : job_classes){
+ job_weights.add(j.class_weight);
+ }
+
+ // Initialize time weights
+ time_weights = new ArrayList<>();
+ for(TimeSample ts : time_distribution){
+ time_weights.add(ts.weight);
+ }
+ }
+
+ public long generateSubmissionTime(){
+ int index = SynthUtils.getWeighted(time_weights, rand);
+ // Retrieve the lower and upper bounds for this time "bucket"
+ int start = time_distribution.get(index).time;
+ // Get the beginning of the next time sample (if it exists)
+ index = (index+1)<time_distribution.size() ? index+1 : index;
+ int end = time_distribution.get(index).time;
+ int range = end-start;
+ // Within this time "bucket", uniformly pick a time if our
+ // range is non-zero, otherwise just use the start time of the bucket
+ return start + (range>0 ? rand.nextInt(range) : 0);
+ }
+
+ public JobDefinition generateJobDefinition(){
+ return job_classes.get(SynthUtils.getWeighted(job_weights, rand));
+ }
+
+ @Override
+ public String toString(){
+ return "\nWorkload " + workload_name + ", weight: " + workload_weight
+ + ", queue: " + queue_name + " "
+ + job_classes.toString().replace("\n", "\n\t");
+ }
}
/**
* Class used to parse a job class from file.
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
- public static class JobClass {
+ public static class JobDefinition {
@JsonProperty("class_name")
String class_name;
@@ -194,6 +356,23 @@ public class SynthTraceJobProducer implements JobStoryProducer {
@JsonProperty("class_weight")
double class_weight;
+ // am type to launch
+ @JsonProperty("type")
+ String type;
+ @JsonProperty("deadline_factor")
+ Sample deadline_factor;
+ @JsonProperty("duration")
+ Sample duration;
+ @JsonProperty("reservation")
+ Sample reservation;
+
+ @JsonProperty("tasks")
+ List<TaskDefinition> tasks;
+
+ @JsonProperty("params")
+ Map<String, String> params;
+
+ // Old JSON fields for backwards compatibility
// reservation related params
@JsonProperty("chance_of_reservation")
double chance_of_reservation;
@@ -246,71 +425,227 @@ public class SynthTraceJobProducer implements JobStoryProducer {
@JsonProperty("reduce_max_vcores_stddev")
double reduce_max_vcores_stddev;
+ public void init(JDKRandomGenerator rand){
+ deadline_factor.init(rand);
+ duration.init(rand);
+ reservation.init(rand);
+
+ for(TaskDefinition t : tasks){
+ t.count.init(rand);
+ t.time.init(rand);
+ t.max_memory.init(rand);
+ t.max_vcores.init(rand);
+ }
+ }
+
+ @Override
+ public String toString(){
+ return "\nJobDefinition " + class_name + ", weight: " + class_weight
+ + ", type: " + type + " "
+ + tasks.toString().replace("\n", "\n\t");
+ }
}
/**
- * This is used to define time-varying probability of a job start-time (e.g.,
- * to simulate daily patterns).
+ * A task representing a type of container - e.g. "map" in mapreduce
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
- public static class TimeSample {
- // in sec
+ public static class TaskDefinition {
+
+ @JsonProperty("type")
+ String type;
+ @JsonProperty("count")
+ Sample count;
@JsonProperty("time")
- int time;
- @JsonProperty("weight")
- double jobs;
+ Sample time;
+ @JsonProperty("max_memory")
+ Sample max_memory;
+ @JsonProperty("max_vcores")
+ Sample max_vcores;
+ @JsonProperty("priority")
+ int priority;
+
+ @Override
+ public String toString(){
+ return "\nTaskDefinition " + type
+ + " Count[" + count + "] Time[" + time + "] Memory[" + max_memory
+ + "] Vcores[" + max_vcores + "] Priority[" + priority + "]";
+ }
}
- static class StoryParams {
- private SynthJobClass pickedJobClass;
- private long actualSubmissionTime;
+ /**
+ * Class used to parse value sample information.
+ */
+ @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+ public static class Sample {
+ private static final Dist DEFAULT_DIST = Dist.LOGNORM;
+
+ private final double val;
+ private final double std;
+ private final Dist dist;
+ private AbstractRealDistribution dist_instance;
+ private final List<String> discrete;
+ private final List<Double> weights;
+ private final Mode mode;
+
+ private JDKRandomGenerator rand;
+
+ private enum Mode{
+ CONST,
+ DIST,
+ DISC
+ }
- StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) {
- this.pickedJobClass = pickedJobClass;
- this.actualSubmissionTime = actualSubmissionTime;
+ private enum Dist{
+ LOGNORM,
+ NORM
}
- }
+ public Sample(Double val) throws JsonMappingException{
+ this(val, null);
+ }
- void createStoryParams() {
+ public Sample(Double val, Double std) throws JsonMappingException{
+ this(val, std, null, null, null);
+ }
- for (int i = 0; i < numJobs.get(); i++) {
- int workload = SynthUtils.getWeighted(weightList, rand);
- SynthWorkload pickedWorkload = workloads.get(workload);
- long jobClass =
- SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand);
- SynthJobClass pickedJobClass =
- pickedWorkload.getClassList().get((int) jobClass);
- long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand);
- // long actualSubmissionTime = (i + 1) * 10;
- listStoryParams
- .add(new StoryParams(pickedJobClass, actualSubmissionTime));
+ @JsonCreator
+ public Sample(@JsonProperty("val") Double val,
+ @JsonProperty("std") Double std, @JsonProperty("dist") String dist,
+ @JsonProperty("discrete") List<String> discrete,
+ @JsonProperty("weights") List<Double> weights)
+ throws JsonMappingException{
+ // Different Modes
+ // - Constant: val must be specified, all else null. Sampling will
+ // return val.
+ // - Distribution: val, std specified, dist optional (defaults to
+ // LogNormal). Sampling will sample from the appropriate distribution
+ // - Discrete: discrete must be set to a list of strings or numbers,
+ // weights optional (defaults to uniform)
+
+ if(val!=null){
+ if(std==null){
+ // Constant
+ if(dist!=null || discrete!=null || weights!=null){
+ throw new JsonMappingException("Instantiation of " + Sample.class
+ + " failed");
+ }
+ mode = Mode.CONST;
+ this.val = val;
+ this.std = 0;
+ this.dist = null;
+ this.discrete = null;
+ this.weights = null;
+ } else {
+ // Distribution
+ if(discrete!=null || weights != null){
+ throw new JsonMappingException("Instantiation of " + Sample.class
+ + " failed");
+ }
+ mode = Mode.DIST;
+ this.val = val;
+ this.std = std;
+ this.dist = dist!=null ? Dist.valueOf(dist) : DEFAULT_DIST;
+ this.discrete = null;
+ this.weights = null;
+ }
+ } else {
+ // Discrete
+ if(discrete==null){
+ throw new JsonMappingException("Instantiation of " + Sample.class
+ + " failed");
+ }
+ mode = Mode.DISC;
+ this.val = 0;
+ this.std = 0;
+ this.dist = null;
+ this.discrete = discrete;
+ if(weights == null){
+ weights = new ArrayList<>(Collections.nCopies(
+ discrete.size(), 1.0));
+ }
+ if(weights.size() != discrete.size()){
+ throw new JsonMappingException("Instantiation of " + Sample.class
+ + " failed");
+ }
+ this.weights = weights;
+ }
}
- }
- @Override
- public JobStory getNextJob() throws IOException {
- if (numJobs.decrementAndGet() < 0) {
- return null;
+ public void init(JDKRandomGenerator random){
+ if(this.rand != null){
+ throw new YarnRuntimeException("init called twice");
+ }
+ this.rand = random;
+ if(mode == Mode.DIST){
+ switch(this.dist){
+ case LOGNORM:
+ this.dist_instance = SynthUtils.getLogNormalDist(rand, val, std);
+ return;
+ case NORM:
+ this.dist_instance = SynthUtils.getNormalDist(rand, val, std);
+ return;
+ default:
+ throw new YarnRuntimeException("Unknown distribution " + dist.name());
+ }
+ }
}
- StoryParams storyParams = listStoryParams.poll();
- return storyParams.pickedJobClass.getJobStory(conf,
- storyParams.actualSubmissionTime);
- }
- @Override
- public void close() {
- }
+ public int getInt(){
+ return Math.toIntExact(getLong());
+ }
- @Override
- public String toString() {
- return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs
- + ", weightList=" + weightList + ", r=" + rand + ", totalWeight="
- + totalWeight + ", workloads=" + workloads + "]";
- }
+ public long getLong(){
+ return Math.round(getDouble());
+ }
+
+ public double getDouble(){
+ return Double.parseDouble(getString());
+ }
+
+ public String getString(){
+ if(this.rand == null){
+ throw new YarnRuntimeException("getValue called without init");
+ }
+ switch(mode){
+ case CONST:
+ return Double.toString(val);
+ case DIST:
+ return Double.toString(dist_instance.sample());
+ case DISC:
+ return this.discrete.get(SynthUtils.getWeighted(this.weights, rand));
+ default:
+ throw new YarnRuntimeException("Unknown sampling mode " + mode.name());
+ }
+ }
+
+ @Override
+ public String toString(){
+ switch(mode){
+ case CONST:
+ return "value: " + Double.toString(val);
+ case DIST:
+ return "value: " + this.val + " std: " + this.std + " dist: "
+ + this.dist.name();
+ case DISC:
+ return "discrete: " + this.discrete + ", weights: " + this.weights;
+ default:
+ throw new YarnRuntimeException("Unknown sampling mode " + mode.name());
+ }
+ }
- public int getNumJobs() {
- return trace.num_jobs;
}
+ /**
+ * This is used to define time-varying probability of a job start-time (e.g.,
+ * to simulate daily patterns).
+ */
+ @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+ public static class TimeSample {
+ // in sec
+ @JsonProperty("time")
+ int time;
+ @JsonProperty("weight")
+ double weight;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
deleted file mode 100644
index 9e5fd4e..0000000
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.sls.synthetic;
-
-import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
-
-import java.util.*;
-
-/**
- * This class represent a workload (made up of multiple SynthJobClass(es)). It
- * also stores the temporal distributions of jobs in this workload.
- */
-public class SynthWorkload {
-
- private final int id;
- private final List<SynthJobClass> classList;
- private final Trace trace;
- private final SortedMap<Integer, Double> timeWeights;
-
- public SynthWorkload(int identifier, Trace inTrace) {
- classList = new ArrayList<SynthJobClass>();
- this.id = identifier;
- this.trace = inTrace;
- timeWeights = new TreeMap<Integer, Double>();
- for (SynthTraceJobProducer.TimeSample ts : trace.workloads
- .get(id).time_distribution) {
- timeWeights.put(ts.time, ts.jobs);
- }
- }
-
- public boolean add(SynthJobClass s) {
- return classList.add(s);
- }
-
- public List<Double> getWeightList() {
- ArrayList<Double> ret = new ArrayList<Double>();
- for (SynthJobClass s : classList) {
- ret.add(s.getClassWeight());
- }
- return ret;
- }
-
- public int getId() {
- return id;
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof SynthWorkload)) {
- return false;
- }
- // assume ID determines job classes by construction
- return getId() == ((SynthWorkload) other).getId();
- }
-
- @Override
- public int hashCode() {
- return getId();
- }
-
- @Override
- public String toString() {
- return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n"
- + classList + "]\n";
- }
-
- public String getName() {
- return trace.workloads.get(id).workload_name;
- }
-
- public double getWorkloadWeight() {
- return trace.workloads.get(id).workload_weight;
- }
-
- public String getQueueName() {
- return trace.workloads.get(id).queue_name;
- }
-
- public long getBaseSubmissionTime(Random rand) {
-
- // pick based on weights the "bucket" for this start time
- int position = SynthUtils.getWeighted(timeWeights.values(), rand);
-
- int[] time = new int[timeWeights.keySet().size()];
- int index = 0;
- for (Integer i : timeWeights.keySet()) {
- time[index++] = i;
- }
-
- // uniformly pick a time between start and end time of this bucket
- int startRange = time[position];
- int endRange = startRange;
- // if there is no subsequent bucket pick startRange
- if (position < timeWeights.keySet().size() - 1) {
- endRange = time[position + 1];
- return startRange + rand.nextInt((endRange - startRange));
- } else {
- return startRange;
- }
- }
-
- public List<SynthJobClass> getClassList() {
- return classList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
index 6b369f2..668be14 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
@@ -125,7 +125,7 @@ public abstract class BaseSLSRunnerTest {
if (!exceptionList.isEmpty()) {
sls.stop();
Assert.fail("TestSLSRunner catched exception from child thread "
- + "(TaskRunner.Task): " + exceptionList);
+ + "(TaskRunner.TaskDefinition): " + exceptionList);
break;
}
timeout--;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java
new file mode 100644
index 0000000..79ebe21
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSGenericSynth extends BaseSLSRunnerTest {
+
+ @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+ public static Collection<Object[]> data() {
+
+ String capScheduler = CapacityScheduler.class.getCanonicalName();
+ String fairScheduler = FairScheduler.class.getCanonicalName();
+ String synthTraceFile = "src/test/resources/syn_generic.json";
+ String nodeFile = "src/test/resources/nodes.json";
+
+ // Test with both schedulers
+ return Arrays.asList(new Object[][] {
+
+ // covering the no nodeFile case
+ {capScheduler, "SYNTH", synthTraceFile, null },
+
+ // covering new commandline and CapacityScheduler
+ {capScheduler, "SYNTH", synthTraceFile, nodeFile },
+
+ // covering FairScheduler
+ {fairScheduler, "SYNTH", synthTraceFile, nodeFile },
+ });
+ }
+
+ @Before
+ public void setup() {
+ ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt";
+ exitInvariantFile = "src/test/resources/exit-invariants.txt";
+ }
+
+ @Test(timeout = 90000)
+ @SuppressWarnings("all")
+ public void testSimulatorRunning() throws Exception {
+ Configuration conf = new Configuration(false);
+ long timeTillShutdownInsec = 20L;
+ runSLS(conf, timeTillShutdownInsec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6adb3d/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java
new file mode 100644
index 0000000..a5d30e0
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSStreamAMSynth extends BaseSLSRunnerTest {
+
+ @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+ public static Collection<Object[]> data() {
+
+ String capScheduler = CapacityScheduler.class.getCanonicalName();
+ String fairScheduler = FairScheduler.class.getCanonicalName();
+ String synthTraceFile = "src/test/resources/syn_stream.json";
+ String nodeFile = "src/test/resources/nodes.json";
+
+ // Test with both schedulers
+ return Arrays.asList(new Object[][] {
+
+ // covering the no nodeFile case
+ {capScheduler, "SYNTH", synthTraceFile, null },
+
+ // covering new commandline and CapacityScheduler
+ {capScheduler, "SYNTH", synthTraceFile, nodeFile },
+
+ // covering FairScheduler
+ {fairScheduler, "SYNTH", synthTraceFile, nodeFile },
+ });
+ }
+
+ @Before
+ public void setup() {
+ ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt";
+ exitInvariantFile = "src/test/resources/exit-invariants.txt";
+ }
+
+ @Test(timeout = 90000)
+ @SuppressWarnings("all")
+ public void testSimulatorRunning() throws Exception {
+ Configuration conf = new Configuration(false);
+ long timeTillShutdownInsec = 20L;
+ runSLS(conf, timeTillShutdownInsec);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org