You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bi...@apache.org on 2019/02/14 17:29:15 UTC

[hadoop] branch branch-3.2 updated: YARN-9293. Optimize MockAMLauncher event handling. Contributed by Bibin A Chundatt.

This is an automated email from the ASF dual-hosted git repository.

bibinchundatt pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f06ac51  YARN-9293. Optimize MockAMLauncher event handling. Contributed by Bibin A Chundatt.
f06ac51 is described below

commit f06ac51c377dc890c0b4680a673332443c78a35d
Author: bibinchundatt <bi...@apache.org>
AuthorDate: Thu Feb 14 22:56:52 2019 +0530

    YARN-9293. Optimize MockAMLauncher event handling. Contributed by Bibin A Chundatt.
    
    (cherry picked from commit 134ae8fc8045e2ae1ed7ca54df95f14ffc863d09)
---
 .../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 11 +++--
 .../hadoop/yarn/sls/appmaster/AMSimulator.java     | 10 +++-
 .../hadoop/yarn/sls/appmaster/MRAMSimulator.java   |  9 ++--
 .../yarn/sls/appmaster/StreamAMSimulator.java      |  5 +-
 .../yarn/sls/resourcemanager/MockAMLauncher.java   | 54 +++++++++++-----------
 .../hadoop/yarn/sls/appmaster/TestAMSimulator.java | 10 ++--
 6 files changed, 57 insertions(+), 42 deletions(-)

diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 1fadd42..b775d8b 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
@@ -113,6 +114,7 @@ public class SLSRunner extends Configured implements Tool {
   // AM simulator
   private int AM_ID;
   private Map<String, AMSimulator> amMap;
+  private Map<ApplicationId, AMSimulator> appIdAMSim;
   private Set<String> trackedApps;
   private Map<String, Class> amClassMap;
   private static int remainingApps = 0;
@@ -170,7 +172,7 @@ public class SLSRunner extends Configured implements Tool {
     queueAppNumMap = new HashMap<>();
     amMap = new ConcurrentHashMap<>();
     amClassMap = new HashMap<>();
-
+    appIdAMSim = new ConcurrentHashMap<>();
     // runner configuration
     setConf(tempConf);
 
@@ -277,7 +279,7 @@ public class SLSRunner extends Configured implements Tool {
     rm = new ResourceManager() {
       @Override
       protected ApplicationMasterLauncher createAMLauncher() {
-        return new MockAMLauncher(se, this.rmContext, amMap);
+        return new MockAMLauncher(se, this.rmContext, appIdAMSim);
       }
     };
 
@@ -587,7 +589,7 @@ public class SLSRunner extends Configured implements Tool {
         try {
           createAMForJob(job, baselineTimeMS);
         } catch (Exception e) {
-          LOG.error("Failed to create an AM: {}", e.getMessage());
+          LOG.error("Failed to create an AM", e);
         }
 
         job = reader.getNext();
@@ -808,7 +810,8 @@ public class SLSRunner extends Configured implements Tool {
       AM_ID++;
       amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
           jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
-          runner.getStartTimeMS(), amContainerResource, labelExpr, params);
+          runner.getStartTimeMS(), amContainerResource, labelExpr, params,
+          appIdAMSim);
       if(reservationId != null) {
         // if we have a ReservationId, delegate reservation creation to
         // AMSim (reservation shape is impl specific)
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 5f34cfc..ac83ab2 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -116,6 +116,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
 
   private ReservationSubmissionRequest reservationRequest;
 
+  private Map<ApplicationId, AMSimulator> appIdToAMSim;
+
   public AMSimulator() {
     this.responseQueue = new LinkedBlockingQueue<>();
   }
@@ -125,8 +127,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
       List<ContainerSimulator> containerList, ResourceManager resourceManager,
       SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
       String simQueue, boolean tracked, String oldApp, long baseTimeMS,
-      Resource amResource, String nodeLabelExpr,
-      Map<String, String> params) {
+      Resource amResource, String nodeLabelExpr, Map<String, String> params,
+      Map<ApplicationId, AMSimulator> appIdAMSim) {
     super.init(startTime, startTime + 1000000L * heartbeatInterval,
         heartbeatInterval);
     this.user = simUser;
@@ -140,6 +142,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
     this.traceFinishTimeMS = finishTime;
     this.amContainerResource = amResource;
     this.nodeLabelExpression = nodeLabelExpr;
+    this.appIdToAMSim = appIdAMSim;
   }
 
   /**
@@ -163,6 +166,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
     // submit application, waiting until ACCEPTED
     submitApp(reservationId);
 
+    // add submitted app to mapping
+    appIdToAMSim.put(appId, this);
+
     // track app metrics
     trackApp();
   }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 71fc5b2..586c671 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -127,10 +128,10 @@ public class MRAMSimulator extends AMSimulator {
       long traceStartTime, long traceFinishTime, String user, String queue,
       boolean isTracked, String oldAppId, long baselineStartTimeMS,
       Resource amContainerResource, String nodeLabelExpr,
-      Map<String, String> params) {
-    super.init(heartbeatInterval, containerList, rm, se,
-        traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
-        baselineStartTimeMS, amContainerResource, nodeLabelExpr, params);
+      Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
+    super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
+        traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
+        amContainerResource, nodeLabelExpr, params, appIdAMSim);
     amtype = "mapreduce";
 
     // get map/reduce tasks
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
index 862e5ec..46bc90a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -97,10 +98,10 @@ public class StreamAMSimulator extends AMSimulator {
       long traceStartTime, long traceFinishTime, String user, String queue,
       boolean isTracked, String oldAppId, long baselineStartTimeMS,
       Resource amContainerResource, String nodeLabelExpr,
-      Map<String, String> params) {
+      Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
     super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
         traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
-        amContainerResource, nodeLabelExpr, params);
+        amContainerResource, nodeLabelExpr, params, appIdAMSim);
     amtype = "stream";
 
     allStreams.addAll(containerList);
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
index 9fb83ec..208629a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -45,13 +45,14 @@ public class MockAMLauncher extends ApplicationMasterLauncher
   private static final Logger LOG = LoggerFactory.getLogger(
       MockAMLauncher.class);
 
-  Map<String, AMSimulator> amMap;
+  private Map<ApplicationId, AMSimulator> appIdAMSim;
+
   SLSRunner se;
 
   public MockAMLauncher(SLSRunner se, RMContext rmContext,
-      Map<String, AMSimulator> amMap) {
+      Map<ApplicationId, AMSimulator> appIdAMSim) {
     super(rmContext);
-    this.amMap = amMap;
+    this.appIdAMSim = appIdAMSim;
     this.se = se;
   }
 
@@ -86,30 +87,28 @@ public class MockAMLauncher extends ApplicationMasterLauncher
           event.getAppAttempt().getAppAttemptId().getApplicationId();
 
       // find AMSimulator
-      for (AMSimulator ams : amMap.values()) {
-        if (ams.getApplicationId() != null && ams.getApplicationId().equals(
-            appId)) {
-          try {
-            Container amContainer = event.getAppAttempt().getMasterContainer();
-
-            setupAMRMToken(event.getAppAttempt());
-
-            // Notify RMAppAttempt to change state
-            super.context.getDispatcher().getEventHandler().handle(
-                new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
-                    RMAppAttemptEventType.LAUNCHED));
-
-            ams.notifyAMContainerLaunched(
-                event.getAppAttempt().getMasterContainer());
-            LOG.info("Notify AM launcher launched:" + amContainer.getId());
-
-            se.getNmMap().get(amContainer.getNodeId())
-                .addNewContainer(amContainer, 100000000L);
-
-            return;
-          } catch (Exception e) {
-            throw new YarnRuntimeException(e);
-          }
+      AMSimulator ams = appIdAMSim.get(appId);
+      if (ams != null) {
+        try {
+          Container amContainer = event.getAppAttempt().getMasterContainer();
+
+          setupAMRMToken(event.getAppAttempt());
+
+          // Notify RMAppAttempt to change state
+          super.context.getDispatcher().getEventHandler().handle(
+              new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
+                  RMAppAttemptEventType.LAUNCHED));
+
+          ams.notifyAMContainerLaunched(
+              event.getAppAttempt().getMasterContainer());
+          LOG.info("Notify AM launcher launched:" + amContainer.getId());
+
+          se.getNmMap().get(amContainer.getNodeId())
+              .addNewContainer(amContainer, 100000000L);
+
+          return;
+        } catch (Exception e) {
+          throw new YarnRuntimeException(e);
         }
       }
 
@@ -117,4 +116,5 @@ public class MockAMLauncher extends ApplicationMasterLauncher
           "Didn't find any AMSimulator for applicationId=" + appId);
     }
   }
+
 }
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index 2efa846..cef41d6 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.sls.appmaster;
 
 import com.codahale.metrics.MetricRegistry;
+import java.util.HashMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -144,8 +145,10 @@ public class TestAMSimulator {
     String appId = "app1";
     String queue = "default";
     List<ContainerSimulator> containers = new ArrayList<>();
+    HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
     app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
-        appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null);
+        appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null,
+        map);
     app.firstStep();
 
     verifySchedulerMetrics(appId);
@@ -169,9 +172,10 @@ public class TestAMSimulator {
       String appId = "app1";
       String queue = "default";
       List<ContainerSimulator> containers = new ArrayList<>();
+      HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
       app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
-          appId, 0, SLSConfiguration.getAMContainerResource(conf),
-          "label1", null);
+          appId, 0, SLSConfiguration.getAMContainerResource(conf), "label1",
+          null, map);
       app.firstStep();
 
       verifySchedulerMetrics(appId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org