You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bi...@apache.org on 2019/02/14 17:27:37 UTC
[hadoop] branch trunk updated: YARN-9293. Optimize MockAMLauncher
event handling. Contributed by Bibin A Chundatt.
This is an automated email from the ASF dual-hosted git repository.
bibinchundatt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 134ae8f YARN-9293. Optimize MockAMLauncher event handling. Contributed by Bibin A Chundatt.
134ae8f is described below
commit 134ae8fc8045e2ae1ed7ca54df95f14ffc863d09
Author: bibinchundatt <bi...@apache.org>
AuthorDate: Thu Feb 14 22:56:52 2019 +0530
YARN-9293. Optimize MockAMLauncher event handling. Contributed by Bibin A Chundatt.
---
.../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 11 +++--
.../hadoop/yarn/sls/appmaster/AMSimulator.java | 10 +++-
.../hadoop/yarn/sls/appmaster/MRAMSimulator.java | 9 ++--
.../yarn/sls/appmaster/StreamAMSimulator.java | 5 +-
.../yarn/sls/resourcemanager/MockAMLauncher.java | 54 +++++++++++-----------
.../hadoop/yarn/sls/appmaster/TestAMSimulator.java | 10 ++--
6 files changed, 57 insertions(+), 42 deletions(-)
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 1fadd42..b775d8b 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
@@ -113,6 +114,7 @@ public class SLSRunner extends Configured implements Tool {
// AM simulator
private int AM_ID;
private Map<String, AMSimulator> amMap;
+ private Map<ApplicationId, AMSimulator> appIdAMSim;
private Set<String> trackedApps;
private Map<String, Class> amClassMap;
private static int remainingApps = 0;
@@ -170,7 +172,7 @@ public class SLSRunner extends Configured implements Tool {
queueAppNumMap = new HashMap<>();
amMap = new ConcurrentHashMap<>();
amClassMap = new HashMap<>();
-
+ appIdAMSim = new ConcurrentHashMap<>();
// runner configuration
setConf(tempConf);
@@ -277,7 +279,7 @@ public class SLSRunner extends Configured implements Tool {
rm = new ResourceManager() {
@Override
protected ApplicationMasterLauncher createAMLauncher() {
- return new MockAMLauncher(se, this.rmContext, amMap);
+ return new MockAMLauncher(se, this.rmContext, appIdAMSim);
}
};
@@ -587,7 +589,7 @@ public class SLSRunner extends Configured implements Tool {
try {
createAMForJob(job, baselineTimeMS);
} catch (Exception e) {
- LOG.error("Failed to create an AM: {}", e.getMessage());
+ LOG.error("Failed to create an AM", e);
}
job = reader.getNext();
@@ -808,7 +810,8 @@ public class SLSRunner extends Configured implements Tool {
AM_ID++;
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
- runner.getStartTimeMS(), amContainerResource, labelExpr, params);
+ runner.getStartTimeMS(), amContainerResource, labelExpr, params,
+ appIdAMSim);
if(reservationId != null) {
// if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific)
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 5f34cfc..ac83ab2 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -116,6 +116,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
private ReservationSubmissionRequest reservationRequest;
+ private Map<ApplicationId, AMSimulator> appIdToAMSim;
+
public AMSimulator() {
this.responseQueue = new LinkedBlockingQueue<>();
}
@@ -125,8 +127,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp, long baseTimeMS,
- Resource amResource, String nodeLabelExpr,
- Map<String, String> params) {
+ Resource amResource, String nodeLabelExpr, Map<String, String> params,
+ Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval);
this.user = simUser;
@@ -140,6 +142,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
this.traceFinishTimeMS = finishTime;
this.amContainerResource = amResource;
this.nodeLabelExpression = nodeLabelExpr;
+ this.appIdToAMSim = appIdAMSim;
}
/**
@@ -163,6 +166,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
// submit application, waiting until ACCEPTED
submitApp(reservationId);
+ // add submitted app to mapping
+ appIdToAMSim.put(appId, this);
+
// track app metrics
trackApp();
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 71fc5b2..586c671 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -127,10 +128,10 @@ public class MRAMSimulator extends AMSimulator {
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, String nodeLabelExpr,
- Map<String, String> params) {
- super.init(heartbeatInterval, containerList, rm, se,
- traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
- baselineStartTimeMS, amContainerResource, nodeLabelExpr, params);
+ Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
+ super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
+ traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
+ amContainerResource, nodeLabelExpr, params, appIdAMSim);
amtype = "mapreduce";
// get map/reduce tasks
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
index 862e5ec..46bc90a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -97,10 +98,10 @@ public class StreamAMSimulator extends AMSimulator {
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, String nodeLabelExpr,
- Map<String, String> params) {
+ Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
- amContainerResource, nodeLabelExpr, params);
+ amContainerResource, nodeLabelExpr, params, appIdAMSim);
amtype = "stream";
allStreams.addAll(containerList);
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
index 9fb83ec..208629a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -45,13 +45,14 @@ public class MockAMLauncher extends ApplicationMasterLauncher
private static final Logger LOG = LoggerFactory.getLogger(
MockAMLauncher.class);
- Map<String, AMSimulator> amMap;
+ private Map<ApplicationId, AMSimulator> appIdAMSim;
+
SLSRunner se;
public MockAMLauncher(SLSRunner se, RMContext rmContext,
- Map<String, AMSimulator> amMap) {
+ Map<ApplicationId, AMSimulator> appIdAMSim) {
super(rmContext);
- this.amMap = amMap;
+ this.appIdAMSim = appIdAMSim;
this.se = se;
}
@@ -86,30 +87,28 @@ public class MockAMLauncher extends ApplicationMasterLauncher
event.getAppAttempt().getAppAttemptId().getApplicationId();
// find AMSimulator
- for (AMSimulator ams : amMap.values()) {
- if (ams.getApplicationId() != null && ams.getApplicationId().equals(
- appId)) {
- try {
- Container amContainer = event.getAppAttempt().getMasterContainer();
-
- setupAMRMToken(event.getAppAttempt());
-
- // Notify RMAppAttempt to change state
- super.context.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
- RMAppAttemptEventType.LAUNCHED));
-
- ams.notifyAMContainerLaunched(
- event.getAppAttempt().getMasterContainer());
- LOG.info("Notify AM launcher launched:" + amContainer.getId());
-
- se.getNmMap().get(amContainer.getNodeId())
- .addNewContainer(amContainer, 100000000L);
-
- return;
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
- }
+ AMSimulator ams = appIdAMSim.get(appId);
+ if (ams != null) {
+ try {
+ Container amContainer = event.getAppAttempt().getMasterContainer();
+
+ setupAMRMToken(event.getAppAttempt());
+
+ // Notify RMAppAttempt to change state
+ super.context.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
+ RMAppAttemptEventType.LAUNCHED));
+
+ ams.notifyAMContainerLaunched(
+ event.getAppAttempt().getMasterContainer());
+ LOG.info("Notify AM launcher launched:" + amContainer.getId());
+
+ se.getNmMap().get(amContainer.getNodeId())
+ .addNewContainer(amContainer, 100000000L);
+
+ return;
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
}
}
@@ -117,4 +116,5 @@ public class MockAMLauncher extends ApplicationMasterLauncher
"Didn't find any AMSimulator for applicationId=" + appId);
}
}
+
}
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index 2efa846..cef41d6 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.sls.appmaster;
import com.codahale.metrics.MetricRegistry;
+import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -144,8 +145,10 @@ public class TestAMSimulator {
String appId = "app1";
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
+ HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
- appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null);
+ appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null,
+ map);
app.firstStep();
verifySchedulerMetrics(appId);
@@ -169,9 +172,10 @@ public class TestAMSimulator {
String appId = "app1";
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
+ HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
- appId, 0, SLSConfiguration.getAMContainerResource(conf),
- "label1", null);
+ appId, 0, SLSConfiguration.getAMContainerResource(conf), "label1",
+ null, map);
app.firstStep();
verifySchedulerMetrics(appId);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org