You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:14:04 UTC

[47/50] [abbrv] hadoop git commit: YARN-3586. RM to only get back addresses of Collectors that NM needs to know. (Junping Du via Varun Saxena).

YARN-3586. RM to only get back addresses of Collectors that NM needs to know.
(Junping Du via Varun Saxena).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0352b974
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0352b974
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0352b974

Branch: refs/heads/feature-YARN-2928
Commit: 0352b97430604fce39d06c578bd1d471b02556af
Parents: 41b9f27
Author: Varun Saxena <va...@apache.org>
Authored: Tue Dec 22 20:58:54 2015 +0530
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 18:03:31 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../resourcemanager/ResourceTrackerService.java | 30 +++----
 .../TestResourceTrackerService.java             | 82 ++++++++++++++++++++
 3 files changed, 100 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0352b974/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index fbd40ba..5bac262 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -155,6 +155,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4445. Unify the term flowId and flowName in timeline v2 codebase. 
     (Zhan Zhang via gtcarrera9). 
 
+    YARN-3586. RM to only get back addresses of Collectors that NM needs to know.
+    (Junping Du via varunsaxena)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0352b974/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index b386f0a..1d9433c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
@@ -477,16 +476,15 @@ public class ResourceTrackerService extends AbstractService implements
       nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
     }
 
-    List<ApplicationId> keepAliveApps =
-        remoteNodeStatus.getKeepAliveApplications();
-    if (timelineV2Enabled && keepAliveApps != null) {
+    if (timelineV2Enabled) {
       // Return collectors' map that NM needs to know
-      // TODO we should optimize this to only include collector info that NM
-      // doesn't know yet.
-      setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
+      setAppCollectorsMapToResponse(rmNode.getRunningApps(),
+          nodeHeartBeatResponse);
     }
 
     // 4. Send status to RMNode, saving the latest response.
+    List<ApplicationId> keepAliveApps =
+        remoteNodeStatus.getKeepAliveApplications();
     RMNodeStatusEvent nodeStatusEvent =
         new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse);
     if (request.getLogAggregationReportsForApps() != null
@@ -514,18 +512,20 @@ public class ResourceTrackerService extends AbstractService implements
   }
 
   private void setAppCollectorsMapToResponse(
-      List<ApplicationId> liveApps, NodeHeartbeatResponse response) {
+      List<ApplicationId> runningApps, NodeHeartbeatResponse response) {
     Map<ApplicationId, String> liveAppCollectorsMap = new
-        ConcurrentHashMap<ApplicationId, String>();
+        HashMap<ApplicationId, String>();
     Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
-    // Set collectors for all apps now.
-    // TODO set collectors for only active apps running on NM (liveApps cannot be
-    // used for this case)
-    for (Map.Entry<ApplicationId, RMApp> rmApp : rmApps.entrySet()) {
-      ApplicationId appId = rmApp.getKey();
-      String appCollectorAddr = rmApp.getValue().getCollectorAddr();
+    // Set collectors for all running apps on this node.
+    for (ApplicationId appId : runningApps) {
+      String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
       if (appCollectorAddr != null) {
         liveAppCollectorsMap.put(appId, appCollectorAddr);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Collector for applicaton: " + appId +
+              " hasn't registered yet!");
+        }
       }
     }
     response.setAppCollectorsMap(liveAppCollectorsMap);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0352b974/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e42ed91..b0e6c55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
@@ -67,8 +69,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
@@ -869,6 +874,83 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     checkRebootedNMCount(rm, ++initialMetricCount);
   }
 
+  @Test
+  public void testNodeHeartbeatForAppCollectorsMap() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    // set version to 2
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    // enable aux-service based timeline collectors
+    conf.set(YarnConfiguration.NM_AUX_SERVICES, "timeline_collector");
+    conf.set(YarnConfiguration.NM_AUX_SERVICES + "."
+        + "timeline_collector" + ".class",
+        PerNodeTimelineCollectorsAuxService.class.getName());
+
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:1234", 2048);
+
+    NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+
+    RMNodeImpl node1 =
+        (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    RMNodeImpl node2 =
+        (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    RMApp app1 = rm.submitApp(1024);
+    String collectorAddr1 = "1.2.3.4:5";
+    app1.setCollectorAddr(collectorAddr1);
+
+    String collectorAddr2 = "5.4.3.2:1";
+    RMApp app2 = rm.submitApp(1024);
+    app2.setCollectorAddr(collectorAddr2);
+
+    // Create a running container for app1 running on nm1
+    ContainerId runningContainerId1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+        app1.getApplicationId(), 0), 0);
+
+    ContainerStatus status1 = ContainerStatus.newInstance(runningContainerId1,
+        ContainerState.RUNNING, "", 0);
+    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+    statusList.add(status1);
+    NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
+        "", System.currentTimeMillis());
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth,
+        statusList, null, nodeHeartbeat1));
+
+    Assert.assertEquals(1, node1.getRunningApps().size());
+    Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0));
+
+    // Create a running container for app2 running on nm2
+    ContainerId runningContainerId2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+        app2.getApplicationId(), 0), 0);
+
+    ContainerStatus status2 = ContainerStatus.newInstance(runningContainerId2,
+        ContainerState.RUNNING, "", 0);
+    statusList = new ArrayList<ContainerStatus>();
+    statusList.add(status2);
+    node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeHealth,
+        statusList, null, nodeHeartbeat2));
+    Assert.assertEquals(1, node2.getRunningApps().size());
+    Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0));
+
+    nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    Map<ApplicationId, String> map1 = nodeHeartbeat1.getAppCollectorsMap();
+    Assert.assertEquals(1, map1.size());
+    Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId()));
+
+    nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+    Map<ApplicationId, String> map2 = nodeHeartbeat2.getAppCollectorsMap();
+    Assert.assertEquals(1, map2.size());
+    Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId()));
+  }
+
   private void checkRebootedNMCount(MockRM rm2, int count)
       throws InterruptedException {