You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/10/14 21:40:26 UTC

[1/2] hadoop git commit: YARN-5638. Introduce a collector timestamp to uniquely identify collectors creation order in collector discovery. Contributed by Li Lu.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-5355 5d7ad396d -> 0c1863144


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 98cbd92..c405a8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -22,6 +22,8 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
@@ -177,27 +180,16 @@ public interface RMApp extends EventHandler<RMAppEvent> {
   String getTrackingUrl();
 
   /**
-   * The collector address for the application. It should be used only if the
-   * timeline service v.2 is enabled.
+   * The timeline collector information for the application. It should be used
+   * only if the timeline service v.2 is enabled.
    *
-   * @return the address for the application's collector, or null if the
-   * timeline service v.2 is not enabled.
+   * @return the data for the application's collector, including collector
+   * address, collector ID. Return null if the timeline service v.2 is not
+   * enabled.
    */
-  String getCollectorAddr();
-
-  /**
-   * Set collector address for the application. It should be used only if the
-   * timeline service v.2 is enabled.
-   *
-   * @param collectorAddr the address of collector
-   */
-  void setCollectorAddr(String collectorAddr);
-
-  /**
-   * Remove collector address when application is finished or killed. It should
-   * be used only if the timeline service v.2 is enabled.
-   */
-  void removeCollectorAddr();
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  AppCollectorData getCollectorData();
 
   /**
    * The original tracking url for the application master.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
deleted file mode 100644
index 9642911..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-/**
- * Event used for updating collector address in RMApp on node heartbeat.
- */
-public class RMAppCollectorUpdateEvent extends RMAppEvent {
-
-  private final String appCollectorAddr;
-
-  public RMAppCollectorUpdateEvent(ApplicationId appId,
-      String appCollectorAddr) {
-    super(appId, RMAppEventType.COLLECTOR_UPDATE);
-    this.appCollectorAddr = appCollectorAddr;
-  }
-
-  public String getAppCollectorAddr(){
-    return this.appCollectorAddr;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index 2b42638..668c5e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -31,9 +31,6 @@ public enum RMAppEventType {
   // Source: Scheduler
   APP_ACCEPTED,
 
-  // TODO add source later
-  COLLECTOR_UPDATE,
-
   // Source: RMAppAttempt
   ATTEMPT_REGISTERED,
   ATTEMPT_UNREGISTERED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 45ff79c..7fb11f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@@ -153,7 +154,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private long storedFinishTime = 0;
   private int firstAttemptIdInStateStore = 1;
   private int nextAttemptId = 1;
-  private String collectorAddr;
+  private AppCollectorData collectorData;
   // This field isn't protected by readlock now.
   private volatile RMAppAttempt currentAttempt;
   private String queue;
@@ -201,8 +202,6 @@ public class RMAppImpl implements RMApp, Recoverable {
      // Transitions from NEW state
     .addTransition(RMAppState.NEW, RMAppState.NEW,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
-    .addTransition(RMAppState.NEW, RMAppState.NEW,
-        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppNewlySavingTransition())
     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@@ -219,8 +218,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     // Transitions from NEW_SAVING state
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
-    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
-        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
         RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@@ -239,8 +236,6 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
-    .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
-        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
         RMAppEventType.APP_REJECTED,
         new FinalSavingTransition(
@@ -257,8 +252,6 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
-    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
-        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
             YarnApplicationState.RUNNING))
@@ -286,8 +279,6 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.MOVE, new RMAppMoveTransition())
-    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
-        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_UNREGISTERED,
         new FinalSavingTransition(
@@ -317,8 +308,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
-    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
-        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -330,8 +319,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
-    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
       EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -343,8 +330,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
-    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
-        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_KILLED,
         new FinalSavingTransition(
@@ -614,18 +599,16 @@ public class RMAppImpl implements RMApp, Recoverable {
   }
 
   @Override
-  public String getCollectorAddr() {
-    return this.collectorAddr;
+  public AppCollectorData getCollectorData() {
+    return this.collectorData;
   }
 
-  @Override
-  public void setCollectorAddr(String collectorAddress) {
-    this.collectorAddr = collectorAddress;
+  public void setCollectorData(AppCollectorData incomingData) {
+    this.collectorData = incomingData;
   }
 
-  @Override
-  public void removeCollectorAddr() {
-    this.collectorAddr = null;
+  public void removeCollectorData() {
+    this.collectorData = null;
   }
 
   @Override
@@ -972,24 +955,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     };
   }
 
-  private static final class RMAppCollectorUpdateTransition
-      extends RMAppTransition {
-
-    public void transition(RMAppImpl app, RMAppEvent event) {
-      if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) {
-        LOG.info("Updating collector info for app: " + app.getApplicationId());
-
-        RMAppCollectorUpdateEvent appCollectorUpdateEvent =
-            (RMAppCollectorUpdateEvent) event;
-        // Update collector address
-        app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
-
-        // TODO persistent to RMStateStore for recover
-        // Save to RMStateStore
-      }
-    };
-  }
-
   private static final class RMAppNodeUpdateTransition extends RMAppTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
       RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 098ba54..3ced3fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
@@ -896,13 +898,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     RMNodeImpl node2 =
         (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId());
 
-    RMApp app1 = rm.submitApp(1024);
+    RMAppImpl app1 = (RMAppImpl) rm.submitApp(1024);
     String collectorAddr1 = "1.2.3.4:5";
-    app1.setCollectorAddr(collectorAddr1);
+    app1.setCollectorData(AppCollectorData.newInstance(
+        app1.getApplicationId(), collectorAddr1));
 
     String collectorAddr2 = "5.4.3.2:1";
-    RMApp app2 = rm.submitApp(1024);
-    app2.setCollectorAddr(collectorAddr2);
+    RMAppImpl app2 = (RMAppImpl) rm.submitApp(1024);
+    app2.setCollectorData(AppCollectorData.newInstance(
+        app2.getApplicationId(), collectorAddr2));
+
+    String collectorAddr3 = "5.4.3.2:2";
+    app2.setCollectorData(AppCollectorData.newInstance(
+        app2.getApplicationId(), collectorAddr3, 0, 1));
+
+    String collectorAddr4 = "5.4.3.2:3";
+    app2.setCollectorData(AppCollectorData.newInstance(
+        app2.getApplicationId(), collectorAddr4, 1, 0));
 
     // Create a running container for app1 running on nm1
     ContainerId runningContainerId1 = BuilderUtils.newContainerId(
@@ -940,14 +952,18 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0));
 
     nodeHeartbeat1 = nm1.nodeHeartbeat(true);
-    Map<ApplicationId, String> map1 = nodeHeartbeat1.getAppCollectorsMap();
+    Map<ApplicationId, AppCollectorData> map1
+        = nodeHeartbeat1.getAppCollectors();
     Assert.assertEquals(1, map1.size());
-    Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId()));
+    Assert.assertEquals(collectorAddr1,
+        map1.get(app1.getApplicationId()).getCollectorAddr());
 
     nodeHeartbeat2 = nm2.nodeHeartbeat(true);
-    Map<ApplicationId, String> map2 = nodeHeartbeat2.getAppCollectorsMap();
+    Map<ApplicationId, AppCollectorData> map2
+        = nodeHeartbeat2.getAppCollectors();
     Assert.assertEquals(1, map2.size());
-    Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId()));
+    Assert.assertEquals(collectorAddr4,
+        map2.get(app2.getApplicationId()).getCollectorAddr());
   }
 
   private void checkRebootedNMCount(MockRM rm2, int count)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 19ee0b1..6274573 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -95,15 +96,7 @@ public abstract class MockAsm extends MockApps {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
-    public String getCollectorAddr() {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-    @Override
-    public void setCollectorAddr(String collectorAddr) {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-    @Override
-    public void removeCollectorAddr() {
+    public AppCollectorData getCollectorData() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 62a5c52..8c8d09e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
@@ -302,17 +303,9 @@ public class MockRMApp implements RMApp {
     throw new UnsupportedOperationException("Not supported yet.");
   }
 
-  public String getCollectorAddr() {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
   @Override
-  public void removeCollectorAddr() {
+  public AppCollectorData getCollectorData() {
     throw new UnsupportedOperationException("Not supported yet.");
   }
 
-  @Override
-  public void setCollectorAddr(String collectorAddr) {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-5638. Introduce a collector timestamp to uniquely identify collectors creation order in collector discovery. Contributed by Li Lu.

Posted by sj...@apache.org.
YARN-5638. Introduce a collector timestamp to uniquely identify collectors creation order in collector discovery. Contributed by Li Lu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c186314
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c186314
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c186314

Branch: refs/heads/YARN-5355
Commit: 0c1863144649ea265da65ce25158707cc3a3fb4a
Parents: 5d7ad39
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Oct 14 14:40:05 2016 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Fri Oct 14 14:40:05 2016 -0700

----------------------------------------------------------------------
 .../protocolrecords/NodeHeartbeatRequest.java   |  13 +-
 .../protocolrecords/NodeHeartbeatResponse.java  |   6 +-
 .../ReportNewCollectorInfoRequest.java          |  10 +-
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     |  47 +++--
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  37 ++--
 .../pb/ReportNewCollectorInfoRequestPBImpl.java |  36 ++--
 .../server/api/records/AppCollectorData.java    | 104 ++++++++++
 .../server/api/records/AppCollectorsMap.java    |  46 -----
 .../records/impl/pb/AppCollectorDataPBImpl.java | 200 +++++++++++++++++++
 .../records/impl/pb/AppCollectorsMapPBImpl.java | 152 --------------
 .../api/records/impl/pb/package-info.java       |  19 ++
 .../yarn_server_common_service_protos.proto     |  14 +-
 .../java/org/apache/hadoop/yarn/TestRPC.java    |   6 +-
 .../hadoop/yarn/TestYarnServerApiClasses.java   |  22 +-
 .../hadoop/yarn/server/nodemanager/Context.java |  14 +-
 .../yarn/server/nodemanager/NodeManager.java    |  23 +--
 .../nodemanager/NodeStatusUpdaterImpl.java      |  59 +++---
 .../collectormanager/NMCollectorService.java    |  29 ++-
 .../application/ApplicationImpl.java            |  33 ++-
 .../amrmproxy/BaseAMRMProxyTest.java            |   8 +-
 .../ApplicationMasterService.java               |  10 +-
 .../resourcemanager/ResourceTrackerService.java |  62 +++---
 .../server/resourcemanager/rmapp/RMApp.java     |  30 +--
 .../rmapp/RMAppCollectorUpdateEvent.java        |  40 ----
 .../resourcemanager/rmapp/RMAppEventType.java   |   3 -
 .../server/resourcemanager/rmapp/RMAppImpl.java |  51 +----
 .../TestResourceTrackerService.java             |  32 ++-
 .../applicationsmanager/MockAsm.java            |  11 +-
 .../server/resourcemanager/rmapp/MockRMApp.java |  11 +-
 29 files changed, 621 insertions(+), 507 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index c795e55..f238f79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
@@ -47,7 +48,7 @@ public abstract class NodeHeartbeatRequest {
   public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
       MasterKey lastKnownContainerTokenMasterKey,
       MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels,
-      Map<ApplicationId, String> registeredCollectors) {
+      Map<ApplicationId, AppCollectorData> registeringCollectors) {
     NodeHeartbeatRequest nodeHeartbeatRequest =
         Records.newRecord(NodeHeartbeatRequest.class);
     nodeHeartbeatRequest.setNodeStatus(nodeStatus);
@@ -56,7 +57,7 @@ public abstract class NodeHeartbeatRequest {
     nodeHeartbeatRequest
         .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
     nodeHeartbeatRequest.setNodeLabels(nodeLabels);
-    nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors);
+    nodeHeartbeatRequest.setRegisteringCollectors(registeringCollectors);
     return nodeHeartbeatRequest;
   }
 
@@ -79,7 +80,9 @@ public abstract class NodeHeartbeatRequest {
       List<LogAggregationReport> logAggregationReportsForApps);
 
   // This tells RM registered collectors' address info on this node
-  public abstract Map<ApplicationId, String> getRegisteredCollectors();
-  public abstract void setRegisteredCollectors(Map<ApplicationId,
-      String> appCollectorsMap);
+  public abstract Map<ApplicationId, AppCollectorData>
+      getRegisteringCollectors();
+
+  public abstract void setRegisteringCollectors(Map<ApplicationId,
+      AppCollectorData> appCollectorsMap);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 09cafaf..f26ae79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -42,8 +43,9 @@ public interface NodeHeartbeatResponse {
   List<ApplicationId> getApplicationsToCleanup();
 
   // This tells NM the collectors' address info of related apps
-  Map<ApplicationId, String> getAppCollectorsMap();
-  void setAppCollectorsMap(Map<ApplicationId, String> appCollectorsMap);
+  Map<ApplicationId, AppCollectorData> getAppCollectors();
+  void setAppCollectors(
+      Map<ApplicationId, AppCollectorData> appCollectorsMap);
 
   void setResponseId(int responseId);
   void setNodeAction(NodeAction action);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
index 3498de9..1503eca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
@@ -22,14 +22,14 @@ import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.util.Records;
 
 @Private
 public abstract class ReportNewCollectorInfoRequest {
 
   public static ReportNewCollectorInfoRequest newInstance(
-      List<AppCollectorsMap> appCollectorsList) {
+      List<AppCollectorData> appCollectorsList) {
     ReportNewCollectorInfoRequest request =
         Records.newRecord(ReportNewCollectorInfoRequest.class);
     request.setAppCollectorsList(appCollectorsList);
@@ -41,13 +41,13 @@ public abstract class ReportNewCollectorInfoRequest {
     ReportNewCollectorInfoRequest request =
         Records.newRecord(ReportNewCollectorInfoRequest.class);
     request.setAppCollectorsList(
-        Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr)));
+        Arrays.asList(AppCollectorData.newInstance(id, collectorAddr)));
     return request;
   }
 
-  public abstract List<AppCollectorsMap> getAppCollectorsList();
+  public abstract List<AppCollectorData> getAppCollectorsList();
 
   public abstract void setAppCollectorsList(
-      List<AppCollectorsMap> appCollectorsList);
+      List<AppCollectorData> appCollectorsList);
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index d0c1198..73a8abe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -35,13 +35,14 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -58,7 +59,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private Set<NodeLabel> labels = null;
   private List<LogAggregationReport> logAggregationReportsForApps = null;
 
-  private Map<ApplicationId, String> registeredCollectors = null;
+  private Map<ApplicationId, AppCollectorData> registeringCollectors = null;
 
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
@@ -114,8 +115,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     if (this.logAggregationReportsForApps != null) {
       addLogAggregationStatusForAppsToProto();
     }
-    if (this.registeredCollectors != null) {
-      addRegisteredCollectorsToProto();
+    if (this.registeringCollectors != null) {
+      addRegisteringCollectorsToProto();
     }
   }
 
@@ -158,14 +159,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     return ((LogAggregationReportPBImpl) value).getProto();
   }
 
-  private void addRegisteredCollectorsToProto() {
+  private void addRegisteringCollectorsToProto() {
     maybeInitBuilder();
-    builder.clearRegisteredCollectors();
-    for (Map.Entry<ApplicationId, String> entry :
-        registeredCollectors.entrySet()) {
-      builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder()
+    builder.clearRegisteringCollectors();
+    for (Map.Entry<ApplicationId, AppCollectorData> entry :
+        registeringCollectors.entrySet()) {
+      builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder()
           .setAppId(convertToProtoFormat(entry.getKey()))
-          .setAppCollectorAddr(entry.getValue()));
+          .setAppCollectorAddr(entry.getValue().getCollectorAddr()));
     }
   }
 
@@ -251,35 +252,37 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   }
 
   @Override
-  public Map<ApplicationId, String> getRegisteredCollectors() {
-    if (this.registeredCollectors != null) {
-      return this.registeredCollectors;
+  public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
+    if (this.registeringCollectors != null) {
+      return this.registeringCollectors;
     }
     initRegisteredCollectors();
-    return registeredCollectors;
+    return registeringCollectors;
   }
 
   private void initRegisteredCollectors() {
     NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
-    List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList();
+    List<AppCollectorDataProto> list = p.getRegisteringCollectorsList();
     if (!list.isEmpty()) {
-      this.registeredCollectors = new HashMap<>();
-      for (AppCollectorsMapProto c : list) {
+      this.registeringCollectors = new HashMap<>();
+      for (AppCollectorDataProto c : list) {
         ApplicationId appId = convertFromProtoFormat(c.getAppId());
-        this.registeredCollectors.put(appId, c.getAppCollectorAddr());
+        AppCollectorData data = AppCollectorData.newInstance(appId,
+            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+        this.registeringCollectors.put(appId, data);
       }
     }
   }
 
   @Override
-  public void setRegisteredCollectors(
-      Map<ApplicationId, String> registeredCollectors) {
+  public void setRegisteringCollectors(
+      Map<ApplicationId, AppCollectorData> registeredCollectors) {
     if (registeredCollectors == null || registeredCollectors.isEmpty()) {
       return;
     }
     maybeInitBuilder();
-    this.registeredCollectors = new HashMap<ApplicationId, String>();
-    this.registeredCollectors.putAll(registeredCollectors);
+    this.registeringCollectors = new HashMap<>();
+    this.registeringCollectors.putAll(registeredCollectors);
   }
 
   private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index cd85241..bc4e802 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -46,11 +46,12 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueui
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -69,7 +70,7 @@ public class NodeHeartbeatResponsePBImpl extends
   private List<ApplicationId> applicationsToCleanup = null;
   private Map<ApplicationId, ByteBuffer> systemCredentials = null;
   private Resource resource = null;
-  private Map<ApplicationId, String> appCollectorsMap = null;
+  private Map<ApplicationId, AppCollectorData> appCollectorsMap = null;
 
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
@@ -145,11 +146,15 @@ public class NodeHeartbeatResponsePBImpl extends
 
   private void addAppCollectorsMapToProto() {
     maybeInitBuilder();
-    builder.clearAppCollectorsMap();
-    for (Map.Entry<ApplicationId, String> entry : appCollectorsMap.entrySet()) {
-      builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder()
+    builder.clearAppCollectors();
+    for (Map.Entry<ApplicationId, AppCollectorData> entry
+        : appCollectorsMap.entrySet()) {
+      AppCollectorData data = entry.getValue();
+      builder.addAppCollectors(AppCollectorDataProto.newBuilder()
           .setAppId(convertToProtoFormat(entry.getKey()))
-          .setAppCollectorAddr(entry.getValue()));
+          .setAppCollectorAddr(data.getCollectorAddr())
+          .setRmIdentifier(data.getRMIdentifier())
+          .setVersion(data.getVersion()));
     }
   }
 
@@ -567,7 +572,7 @@ public class NodeHeartbeatResponsePBImpl extends
   }
 
   @Override
-  public Map<ApplicationId, String> getAppCollectorsMap() {
+  public Map<ApplicationId, AppCollectorData> getAppCollectors() {
     if (this.appCollectorsMap != null) {
       return this.appCollectorsMap;
     }
@@ -588,12 +593,14 @@ public class NodeHeartbeatResponsePBImpl extends
 
   private void initAppCollectorsMap() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<AppCollectorsMapProto> list = p.getAppCollectorsMapList();
+    List<AppCollectorDataProto> list = p.getAppCollectorsList();
     if (!list.isEmpty()) {
       this.appCollectorsMap = new HashMap<>();
-      for (AppCollectorsMapProto c : list) {
+      for (AppCollectorDataProto c : list) {
         ApplicationId appId = convertFromProtoFormat(c.getAppId());
-        this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
+        AppCollectorData data = AppCollectorData.newInstance(appId,
+            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+        this.appCollectorsMap.put(appId, data);
       }
     }
   }
@@ -610,14 +617,14 @@ public class NodeHeartbeatResponsePBImpl extends
   }
 
   @Override
-  public void setAppCollectorsMap(
-      Map<ApplicationId, String> appCollectorsMap) {
-    if (appCollectorsMap == null || appCollectorsMap.isEmpty()) {
+  public void setAppCollectors(
+      Map<ApplicationId, AppCollectorData> appCollectors) {
+    if (appCollectors == null || appCollectors.isEmpty()) {
       return;
     }
     maybeInitBuilder();
-    this.appCollectorsMap = new HashMap<ApplicationId, String>();
-    this.appCollectorsMap.putAll(appCollectorsMap);
+    this.appCollectorsMap = new HashMap<>();
+    this.appCollectorsMap.putAll(appCollectors);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
index c6f6619..3f3dcf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
 
 public class ReportNewCollectorInfoRequestPBImpl extends
     ReportNewCollectorInfoRequest {
@@ -36,7 +36,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends
   private ReportNewCollectorInfoRequestProto.Builder builder = null;
   private boolean viaProto = false;
 
-  private List<AppCollectorsMap> collectorsList = null;
+  private List<AppCollectorData> collectorsList = null;
 
   public ReportNewCollectorInfoRequestPBImpl() {
     builder = ReportNewCollectorInfoRequestProto.newBuilder();
@@ -96,9 +96,9 @@ public class ReportNewCollectorInfoRequestPBImpl extends
   private void addLocalCollectorsToProto() {
     maybeInitBuilder();
     builder.clearAppCollectors();
-    List<AppCollectorsMapProto> protoList =
-        new ArrayList<AppCollectorsMapProto>();
-    for (AppCollectorsMap m : this.collectorsList) {
+    List<AppCollectorDataProto> protoList =
+        new ArrayList<AppCollectorDataProto>();
+    for (AppCollectorData m : this.collectorsList) {
       protoList.add(convertToProtoFormat(m));
     }
     builder.addAllAppCollectors(protoList);
@@ -106,16 +106,16 @@ public class ReportNewCollectorInfoRequestPBImpl extends
 
   private void initLocalCollectorsList() {
     ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
-    List<AppCollectorsMapProto> list =
+    List<AppCollectorDataProto> list =
         p.getAppCollectorsList();
-    this.collectorsList = new ArrayList<AppCollectorsMap>();
-    for (AppCollectorsMapProto m : list) {
+    this.collectorsList = new ArrayList<AppCollectorData>();
+    for (AppCollectorDataProto m : list) {
       this.collectorsList.add(convertFromProtoFormat(m));
     }
   }
 
   @Override
-  public List<AppCollectorsMap> getAppCollectorsList() {
+  public List<AppCollectorData> getAppCollectorsList() {
     if (this.collectorsList == null) {
       initLocalCollectorsList();
     }
@@ -123,7 +123,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends
   }
 
   @Override
-  public void setAppCollectorsList(List<AppCollectorsMap> appCollectorsList) {
+  public void setAppCollectorsList(List<AppCollectorData> appCollectorsList) {
     maybeInitBuilder();
     if (appCollectorsList == null) {
       builder.clearAppCollectors();
@@ -131,14 +131,14 @@ public class ReportNewCollectorInfoRequestPBImpl extends
     this.collectorsList = appCollectorsList;
   }
 
-  private AppCollectorsMapPBImpl convertFromProtoFormat(
-      AppCollectorsMapProto p) {
-    return new AppCollectorsMapPBImpl(p);
+  private AppCollectorDataPBImpl convertFromProtoFormat(
+      AppCollectorDataProto p) {
+    return new AppCollectorDataPBImpl(p);
   }
 
-  private AppCollectorsMapProto convertToProtoFormat(
-      AppCollectorsMap m) {
-    return ((AppCollectorsMapPBImpl) m).getProto();
+  private AppCollectorDataProto convertToProtoFormat(
+      AppCollectorData m) {
+    return ((AppCollectorDataPBImpl) m).getProto();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
new file mode 100644
index 0000000..da2e5de
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+
+@Private
+@InterfaceStability.Unstable
+public abstract class AppCollectorData {
+
+  protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
+
+  public static AppCollectorData newInstance(
+      ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
+    AppCollectorData appCollectorData =
+        Records.newRecord(AppCollectorData.class);
+    appCollectorData.setApplicationId(id);
+    appCollectorData.setCollectorAddr(collectorAddr);
+    appCollectorData.setRMIdentifier(rmIdentifier);
+    appCollectorData.setVersion(version);
+    return appCollectorData;
+  }
+
+  public static AppCollectorData newInstance(ApplicationId id,
+      String collectorAddr) {
+    return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
+        DEFAULT_TIMESTAMP_VALUE);
+  }
+
+  /**
+   * Returns if a collector data item happens before another one. Null data
+   * items happens before any other non-null items. Non-null data items A
+   * happens before another non-null item B when A's rmIdentifier is less than
+   * B's rmIdentifier. Or A's version is less than B's if they have the same
+   * rmIdentifier.
+   *
+   * @param dataA first collector data item.
+   * @param dataB second collector data item.
+   * @return true if dataA happens before dataB.
+   */
+  public static boolean happensBefore(AppCollectorData dataA,
+      AppCollectorData dataB) {
+    if (dataA == null && dataB == null) {
+      return false;
+    } else if (dataA == null || dataB == null) {
+      return dataA == null;
+    }
+
+    return
+        (dataA.getRMIdentifier() < dataB.getRMIdentifier())
+        || ((dataA.getRMIdentifier() == dataB.getRMIdentifier())
+            && (dataA.getVersion() < dataB.getVersion()));
+  }
+
+  /**
+   * Returns if the collector data has been stamped by the RM with a RM cluster
+   * timestamp and a version number.
+   *
+   * @return true if RM has already assigned a timestamp for this collector.
+   * Otherwise, it means the RM has not recognized the existence of this
+   * collector.
+   */
+  public boolean isStamped() {
+    return (getRMIdentifier() != DEFAULT_TIMESTAMP_VALUE)
+        || (getVersion() != DEFAULT_TIMESTAMP_VALUE);
+  }
+
+  public abstract ApplicationId getApplicationId();
+
+  public abstract void setApplicationId(ApplicationId id);
+
+  public abstract String getCollectorAddr();
+
+  public abstract void setCollectorAddr(String addr);
+
+  public abstract long getRMIdentifier();
+
+  public abstract void setRMIdentifier(long rmId);
+
+  public abstract long getVersion();
+
+  public abstract void setVersion(long version);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
deleted file mode 100644
index 07e1d92..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api.records;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.Records;
-
-
-@Private
-public abstract class AppCollectorsMap {
-
-  public static AppCollectorsMap newInstance(
-      ApplicationId id, String collectorAddr) {
-    AppCollectorsMap appCollectorsMap =
-        Records.newRecord(AppCollectorsMap.class);
-    appCollectorsMap.setApplicationId(id);
-    appCollectorsMap.setCollectorAddr(collectorAddr);
-    return appCollectorsMap;
-  }
-
-  public abstract ApplicationId getApplicationId();
-
-  public abstract void setApplicationId(ApplicationId id);
-
-  public abstract String getCollectorAddr();
-
-  public abstract void setCollectorAddr(String addr);
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
new file mode 100644
index 0000000..7d3a805
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
@@ -0,0 +1,200 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class AppCollectorDataPBImpl extends AppCollectorData {
+
+  private AppCollectorDataProto proto =
+      AppCollectorDataProto.getDefaultInstance();
+
+  private AppCollectorDataProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private ApplicationId appId = null;
+  private String collectorAddr = null;
+  private Long rmIdentifier = null;
+  private Long version = null;
+
+  public AppCollectorDataPBImpl() {
+    builder = AppCollectorDataProto.newBuilder();
+  }
+
+  public AppCollectorDataPBImpl(AppCollectorDataProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public AppCollectorDataProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.appId == null && p.hasAppId()) {
+      this.appId = convertFromProtoFormat(p.getAppId());
+    }
+    return this.appId;
+  }
+
+  @Override
+  public String getCollectorAddr() {
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.collectorAddr == null
+        && p.hasAppCollectorAddr()) {
+      this.collectorAddr = p.getAppCollectorAddr();
+    }
+    return this.collectorAddr;
+  }
+
+  @Override
+  public void setApplicationId(ApplicationId id) {
+    maybeInitBuilder();
+    if (id == null) {
+      builder.clearAppId();
+    }
+    this.appId = id;
+  }
+
+  @Override
+  public void setCollectorAddr(String collectorAddr) {
+    maybeInitBuilder();
+    if (collectorAddr == null) {
+      builder.clearAppCollectorAddr();
+    }
+    this.collectorAddr = collectorAddr;
+  }
+
+  @Override
+  public long getRMIdentifier() {
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.rmIdentifier == null && p.hasRmIdentifier()) {
+      this.rmIdentifier = p.getRmIdentifier();
+    }
+    if (this.rmIdentifier != null) {
+      return this.rmIdentifier;
+    } else {
+      return AppCollectorData.DEFAULT_TIMESTAMP_VALUE;
+    }
+  }
+
+  @Override
+  public void setRMIdentifier(long rmId) {
+    maybeInitBuilder();
+    this.rmIdentifier = rmId;
+    builder.setRmIdentifier(rmId);
+  }
+
+  @Override
+  public long getVersion() {
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.version == null && p.hasRmIdentifier()) {
+      this.version = p.getRmIdentifier();
+    }
+    if (this.version != null) {
+      return this.version;
+    } else {
+      return AppCollectorData.DEFAULT_TIMESTAMP_VALUE;
+    }
+  }
+
+  @Override
+  public void setVersion(long version) {
+    maybeInitBuilder();
+    this.version = version;
+    builder.setVersion(version);
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = AppCollectorDataProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.appId != null) {
+      builder.setAppId(convertToProtoFormat(this.appId));
+    }
+    if (this.collectorAddr != null) {
+      builder.setAppCollectorAddr(this.collectorAddr);
+    }
+    if (this.rmIdentifier != null) {
+      builder.setRmIdentifier(this.rmIdentifier);
+    }
+    if (this.version != null) {
+      builder.setVersion(this.version);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
deleted file mode 100644
index 3740035..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.yarn.server.api.records.impl.pb;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
-
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder;
-
-import com.google.protobuf.TextFormat;
-
-@Private
-@Unstable
-public class AppCollectorsMapPBImpl extends AppCollectorsMap {
-
-  private AppCollectorsMapProto proto =
-      AppCollectorsMapProto.getDefaultInstance();
-
-  private AppCollectorsMapProto.Builder builder = null;
-  private boolean viaProto = false;
-
-  private ApplicationId appId = null;
-  private String collectorAddr = null;
-
-  public AppCollectorsMapPBImpl() {
-    builder = AppCollectorsMapProto.newBuilder();
-  }
-
-  public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) {
-    this.proto = proto;
-    viaProto = true;
-  }
-
-  public AppCollectorsMapProto getProto() {
-    mergeLocalToProto();
-    proto = viaProto ? proto : builder.build();
-    viaProto = true;
-    return proto;
-  }
-
-  @Override
-  public int hashCode() {
-    return getProto().hashCode();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null) {
-      return false;
-    }
-    if (other.getClass().isAssignableFrom(this.getClass())) {
-      return this.getProto().equals(this.getClass().cast(other).getProto());
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return TextFormat.shortDebugString(getProto());
-  }
-
-  @Override
-  public ApplicationId getApplicationId() {
-    AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.appId == null && p.hasAppId()) {
-      this.appId = convertFromProtoFormat(p.getAppId());
-    }
-    return this.appId;
-  }
-
-  @Override
-  public String getCollectorAddr() {
-    AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.collectorAddr == null
-        && p.hasAppCollectorAddr()) {
-      this.collectorAddr = p.getAppCollectorAddr();
-    }
-    return this.collectorAddr;
-  }
-
-  @Override
-  public void setApplicationId(ApplicationId id) {
-    maybeInitBuilder();
-    if (id == null) {
-      builder.clearAppId();
-    }
-    this.appId = id;
-  }
-
-  @Override
-  public void setCollectorAddr(String collectorAddr) {
-    maybeInitBuilder();
-    if (collectorAddr == null) {
-      builder.clearAppCollectorAddr();
-    }
-    this.collectorAddr = collectorAddr;
-  }
-
-  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
-    return new ApplicationIdPBImpl(p);
-  }
-
-  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
-    return ((ApplicationIdPBImpl) t).getProto();
-  }
-
-  private void maybeInitBuilder() {
-    if (viaProto || builder == null) {
-      builder = AppCollectorsMapProto.newBuilder(proto);
-    }
-    viaProto = false;
-  }
-
-  private void mergeLocalToProto() {
-    if (viaProto) {
-      maybeInitBuilder();
-    }
-    mergeLocalToBuilder();
-    proto = builder.build();
-    viaProto = true;
-  }
-
-  private void mergeLocalToBuilder() {
-    if (this.appId != null) {
-      builder.setAppId(convertToProtoFormat(this.appId));
-    }
-    if (this.collectorAddr != null) {
-      builder.setAppCollectorAddr(this.collectorAddr);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java
new file mode 100644
index 0000000..4ce3896
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Server records PB implementations. */
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 3660252..932c885 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -84,7 +84,7 @@ message NodeHeartbeatRequestProto {
   optional MasterKeyProto last_known_nm_token_master_key = 3;
   optional NodeLabelsProto nodeLabels = 4;
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
-  repeated AppCollectorsMapProto registered_collectors = 6;
+  repeated AppCollectorDataProto registering_collectors = 6;
 }
 
 message LogAggregationReportProto {
@@ -109,7 +109,7 @@ message NodeHeartbeatResponseProto {
   repeated SignalContainerRequestProto containers_to_signal = 13;
   optional ResourceProto resource = 14;
   optional ContainerQueuingLimitProto container_queuing_limit = 15;
-  repeated AppCollectorsMapProto app_collectors_map = 16;
+  repeated AppCollectorDataProto app_collectors = 16;
 }
 
 message ContainerQueuingLimitProto {
@@ -125,16 +125,18 @@ message SystemCredentialsForAppsProto {
 ////////////////////////////////////////////////////////////////////////
 ////// From collector_nodemanager_protocol ////////////////////////////
 ////////////////////////////////////////////////////////////////////////
-message AppCollectorsMapProto {
-  optional ApplicationIdProto appId = 1;
-  optional string appCollectorAddr = 2;
+message AppCollectorDataProto {
+  optional ApplicationIdProto app_id = 1;
+  optional string app_collector_addr = 2;
+  optional int64 rm_identifier = 3 [default = -1];
+  optional int64 version = 4 [default = -1];
 }
 
 //////////////////////////////////////////////////////
 /////// collector_nodemanager_protocol //////////////
 //////////////////////////////////////////////////////
 message ReportNewCollectorInfoRequestProto {
-  repeated AppCollectorsMapProto app_collectors = 1;
+  repeated AppCollectorDataProto app_collectors = 1;
 }
 
 message ReportNewCollectorInfoResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index e25f528..ba434ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -68,7 +68,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -382,10 +382,10 @@ public class TestRPC {
     public ReportNewCollectorInfoResponse reportNewCollectorInfo(
         ReportNewCollectorInfoRequest request)
         throws YarnException, IOException {
-      List<AppCollectorsMap> appCollectors = request.getAppCollectorsList();
+      List<AppCollectorData> appCollectors = request.getAppCollectorsList();
       if (appCollectors.size() == 1) {
         // check default appID and collectorAddr
-        AppCollectorsMap appCollector = appCollectors.get(0);
+        AppCollectorData appCollector = appCollectors.get(0);
         Assert.assertEquals(appCollector.getApplicationId(),
             DEFAULT_APP_ID);
         Assert.assertEquals(appCollector.getCollectorAddr(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index be350e2..944fc0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRe
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -109,14 +110,14 @@ public class TestYarnServerApiClasses {
     original.setLastKnownNMTokenMasterKey(getMasterKey());
     original.setNodeStatus(getNodeStatus());
     original.setNodeLabels(getValidNodeLabels());
-    Map<ApplicationId, String> collectors = getCollectors();
-    original.setRegisteredCollectors(collectors);
+    Map<ApplicationId, AppCollectorData> collectors = getCollectors();
+    original.setRegisteringCollectors(collectors);
     NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
         original.getProto());
     assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
     assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
-    assertEquals(collectors, copy.getRegisteredCollectors());
+    assertEquals(collectors, copy.getRegisteringCollectors());
     // check labels are coming with valid values
     Assert.assertTrue(original.getNodeLabels()
         .containsAll(copy.getNodeLabels()));
@@ -153,8 +154,8 @@ public class TestYarnServerApiClasses {
     original.setNextHeartBeatInterval(1000);
     original.setNodeAction(NodeAction.NORMAL);
     original.setResponseId(100);
-    Map<ApplicationId, String> collectors = getCollectors();
-    original.setAppCollectorsMap(collectors);
+    Map<ApplicationId, AppCollectorData> collectors = getCollectors();
+    original.setAppCollectors(collectors);
 
     NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
         original.getProto());
@@ -164,7 +165,7 @@ public class TestYarnServerApiClasses {
     assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
     assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
-    assertEquals(collectors, copy.getAppCollectorsMap());
+    assertEquals(collectors, copy.getAppCollectors());
     assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
    }
 
@@ -344,12 +345,13 @@ public class TestYarnServerApiClasses {
     return nodeLabels;
   }
 
-  private Map<ApplicationId, String> getCollectors() {
+  private Map<ApplicationId, AppCollectorData> getCollectors() {
     ApplicationId appID = ApplicationId.newInstance(1L, 1);
     String collectorAddr = "localhost:0";
-    Map<ApplicationId, String> collectorMap =
-        new HashMap<ApplicationId, String>();
-    collectorMap.put(appID, collectorAddr);
+    AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr);
+    Map<ApplicationId, AppCollectorData> collectorMap =
+        new HashMap<>();
+    collectorMap.put(appID, data);
     return collectorMap;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 131eaa3..a20b267 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -74,11 +75,18 @@ public interface Context {
   Map<ApplicationId, Credentials> getSystemCredentialsForApps();
 
   /**
-   * Get the registered collectors that located on this NM.
-   * @return registered collectors, or null if the timeline service v.2 is not
+   * Get the list of collectors that are registering with the RM from this node.
+   * @return registering collectors, or null if the timeline service v.2 is not
    * enabled
    */
-  Map<ApplicationId, String> getRegisteredCollectors();
+  Map<ApplicationId, AppCollectorData> getRegisteringCollectors();
+
+  /**
+   * Get the list of collectors registered with the RM and known by this node.
+   * @return known collectors, or null if the timeline service v.2 is not
+   * enabled.
+   */
+  Map<ApplicationId, AppCollectorData> getKnownCollectors();
 
   ConcurrentMap<ContainerId, Container> getContainers();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 5bfbb8d..fd171c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
@@ -483,7 +484,9 @@ public class NodeManager extends CompositeService
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
 
-    private Map<ApplicationId, String> registeredCollectors;
+    private Map<ApplicationId, AppCollectorData> registeringCollectors;
+
+    private Map<ApplicationId, AppCollectorData> knownCollectors;
 
     protected final ConcurrentMap<ContainerId,
         org.apache.hadoop.yarn.api.records.Container> increasedContainers =
@@ -517,7 +520,8 @@ public class NodeManager extends CompositeService
         NMStateStoreService stateStore, boolean isDistSchedulingEnabled,
         Configuration conf) {
       if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
-        this.registeredCollectors = new ConcurrentHashMap<>();
+        this.registeringCollectors = new ConcurrentHashMap<>();
+        this.knownCollectors = new ConcurrentHashMap<>();
       }
       this.containerTokenSecretManager = containerTokenSecretManager;
       this.nmTokenSecretManager = nmTokenSecretManager;
@@ -678,18 +682,13 @@ public class NodeManager extends CompositeService
     }
 
     @Override
-    public Map<ApplicationId, String> getRegisteredCollectors() {
-      return this.registeredCollectors;
+    public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
+      return this.registeringCollectors;
     }
 
-    public void addRegisteredCollectors(
-        Map<ApplicationId, String> newRegisteredCollectors) {
-      if (registeredCollectors != null) {
-        this.registeredCollectors.putAll(newRegisteredCollectors);
-      } else {
-        LOG.warn("collectors are added when the registered collectors are " +
-            "initialized");
-      }
+    @Override
+    public Map<ApplicationId, AppCollectorData> getKnownCollectors() {
+      return this.knownCollectors;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index f692bf1..116625b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
 
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -817,7 +818,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                         .getNMTokenSecretManager().getCurrentKey(),
                     nodeLabelsForHeartbeat,
                     NodeStatusUpdaterImpl.this.context
-                        .getRegisteredCollectors());
+                        .getRegisteringCollectors());
 
             if (logAggregationEnabled) {
               // pull log aggregation status for application running in this NM
@@ -910,7 +911,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               }
             }
             if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
-              updateTimelineClientsAddress(response);
+              updateTimelineCollectorData(response);
             }
 
           } catch (ConnectException e) {
@@ -940,40 +941,48 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         }
       }
 
-      private void updateTimelineClientsAddress(
+      private void updateTimelineCollectorData(
           NodeHeartbeatResponse response) {
-        Map<ApplicationId, String> knownCollectorsMap =
-            response.getAppCollectorsMap();
-        if (knownCollectorsMap == null) {
+        Map<ApplicationId, AppCollectorData> incomingCollectorsMap =
+            response.getAppCollectors();
+        if (incomingCollectorsMap == null) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("No collectors to update RM");
           }
-        } else {
-          Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
-              knownCollectorsMap.entrySet();
-          for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
-            ApplicationId appId = entry.getKey();
-            String collectorAddr = entry.getValue();
-
-            // Only handle applications running on local node.
-            // Not include apps with timeline collectors running in local
-            Application application = context.getApplications().get(appId);
-            // TODO this logic could be problematic if the collector address
-            // gets updated due to NM restart or collector service failure
-            if (application != null &&
-                !context.getRegisteredCollectors().containsKey(appId)) {
+          return;
+        }
+        Map<ApplicationId, AppCollectorData> knownCollectors
+            = context.getKnownCollectors();
+        for (Map.Entry<ApplicationId, AppCollectorData> entry
+            : incomingCollectorsMap.entrySet()) {
+          ApplicationId appId = entry.getKey();
+          AppCollectorData collectorData = entry.getValue();
+          // Only handle applications running on local node.
+          Application application = context.getApplications().get(appId);
+          if (application != null) {
+            // Update collector data if the newly received data happens after
+            // the known data (updates the known data).
+            AppCollectorData existingData = knownCollectors.get(appId);
+            if (AppCollectorData.happensBefore(existingData, collectorData)) {
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Sync a new collector address: " + collectorAddr +
-                    " for application: " + appId + " from RM.");
+                LOG.debug("Sync a new collector address: "
+                    + collectorData.getCollectorAddr()
+                    + " for application: " + appId + " from RM.");
               }
-              NMTimelinePublisher nmTimelinePublisher =
-                  context.getNMTimelinePublisher();
+
+              // Update information for clients.
+              NMTimelinePublisher nmTimelinePublisher
+                  = context.getNMTimelinePublisher();
               if (nmTimelinePublisher != null) {
                 nmTimelinePublisher.setTimelineServiceAddress(
-                    application.getAppId(), collectorAddr);
+                    application.getAppId(), collectorData.getCollectorAddr());
               }
+              // Update information for the node manager itself.
+              knownCollectors.put(appId, collectorData);
             }
           }
+          // Remove the registering collector data
+          context.getRegisteringCollectors().remove(entry.getKey());
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
index d667c0e..7fdca78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -37,9 +37,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 
@@ -107,23 +106,31 @@ public class NMCollectorService extends CompositeService implements
   @Override
   public ReportNewCollectorInfoResponse reportNewCollectorInfo(
       ReportNewCollectorInfoRequest request) throws YarnException, IOException {
-    List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList();
+    List<AppCollectorData> newCollectorsList = request.getAppCollectorsList();
     if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
-      Map<ApplicationId, String> newCollectorsMap =
-          new HashMap<ApplicationId, String>();
-      for (AppCollectorsMap collector : newCollectorsList) {
+      Map<ApplicationId, AppCollectorData> newCollectorsMap =
+          new HashMap<>();
+      for (AppCollectorData collector : newCollectorsList) {
         ApplicationId appId = collector.getApplicationId();
-        String collectorAddr = collector.getCollectorAddr();
-        newCollectorsMap.put(appId, collectorAddr);
+        newCollectorsMap.put(appId, collector);
         // set registered collector address to TimelineClient.
+        // TODO: Do we need to do this after we received confirmation from
+        // the RM?
         NMTimelinePublisher nmTimelinePublisher =
             context.getNMTimelinePublisher();
         if (nmTimelinePublisher != null) {
-          nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr);
+          nmTimelinePublisher.setTimelineServiceAddress(appId,
+              collector.getCollectorAddr());
         }
       }
-      ((NodeManager.NMContext)context).addRegisteredCollectors(
-          newCollectorsMap);
+      Map<ApplicationId, AppCollectorData> registeringCollectors
+          = context.getRegisteringCollectors();
+      if (registeringCollectors != null) {
+        registeringCollectors.putAll(newCollectorsMap);
+      } else {
+        LOG.warn("collectors are added when the registered collectors are " +
+            "initialized");
+      }
     }
 
     return ReportNewCollectorInfoResponse.newInstance();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 531693e..53ff8f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@@ -544,21 +545,20 @@ public class ApplicationImpl implements Application {
   @SuppressWarnings("unchecked")
   static class AppCompletelyDoneTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
-    @Override
-    public void transition(ApplicationImpl app, ApplicationEvent event) {
 
-      // Inform the logService
-      app.dispatcher.getEventHandler().handle(
-          new LogHandlerAppFinishedEvent(app.appId));
-
-      app.context.getNMTokenSecretManager().appFinished(app.getAppId());
+    private void updateCollectorStatus(ApplicationImpl app) {
       // Remove collectors info for finished apps.
       // TODO check we remove related collectors info in failure cases
       // (YARN-3038)
-      Map<ApplicationId, String> registeredCollectors =
-          app.context.getRegisteredCollectors();
-      if (registeredCollectors != null) {
-        registeredCollectors.remove(app.getAppId());
+      Map<ApplicationId, AppCollectorData> registeringCollectors
+          = app.context.getRegisteringCollectors();
+      if (registeringCollectors != null) {
+        registeringCollectors.remove(app.getAppId());
+      }
+      Map<ApplicationId, AppCollectorData> knownCollectors =
+          app.context.getKnownCollectors();
+      if (knownCollectors != null) {
+        knownCollectors.remove(app.getAppId());
       }
       // stop timelineClient when application get finished.
       NMTimelinePublisher nmTimelinePublisher =
@@ -567,6 +567,17 @@ public class ApplicationImpl implements Application {
         nmTimelinePublisher.stopTimelineClient(app.getAppId());
       }
     }
+
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+
+      // Inform the logService
+      app.dispatcher.getEventHandler().handle(
+          new LogHandlerAppFinishedEvent(app.appId));
+
+      app.context.getNMTokenSecretManager().appFinished(app.getAppId());
+      updateCollectorStatus(app);
+    }
   }
 
   static class AppLogsAggregatedTransition implements

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index f716d44..94b3b07 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -617,8 +618,11 @@ public abstract class BaseAMRMProxyTest {
       return null;
     }
 
-    @Override
-    public Map<ApplicationId, String> getRegisteredCollectors() {
+    public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
+      return null;
+    }
+
+    @Override public Map<ApplicationId, AppCollectorData> getKnownCollectors() {
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index f575961..1256cad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -78,8 +78,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@@ -314,7 +316,7 @@ public class ApplicationMasterService extends AbstractService implements
 
     // Remove collector address when app get finished.
     if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-      rmApp.removeCollectorAddr();
+      ((RMAppImpl) rmApp).removeCollectorData();
     }
     // checking whether the app exits in RMStateStore at first not to throw
     // ApplicationDoesNotExistInCacheException before and after
@@ -579,8 +581,10 @@ public class ApplicationMasterService extends AbstractService implements
 
       // add collector address for this application
       if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-        allocateResponse.setCollectorAddr(
-            this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
+        AppCollectorData data = app.getCollectorData();
+        if (data != null) {
+          allocateResponse.setCollectorAddr(data.getCollectorAddr());
+        }
       }
 
       // add preemption to the allocateResponse message (if any)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c186314/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 9d480f3..997da2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -63,13 +64,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -117,6 +119,8 @@ public class ResourceTrackerService extends AbstractService implements
   private boolean isDelegatedCentralizedNodeLabelsConf;
   private DynamicResourceConfiguration drConf;
 
+  private final AtomicLong timelineCollectorVersion = new AtomicLong(0);
+
   public ResourceTrackerService(RMContext rmContext,
       NodesListManager nodesListManager,
       NMLivelinessMonitor nmLivelinessMonitor,
@@ -520,9 +524,6 @@ public class ResourceTrackerService extends AbstractService implements
         YarnConfiguration.timelineServiceV2Enabled(getConfig());
     if (timelineV2Enabled) {
       // Check & update collectors info from request.
-      // TODO make sure it won't have race condition issue for AM failed over
-      // case that the older registration could possible override the newer
-      // one.
       updateAppCollectorsMap(request);
     }
 
@@ -594,14 +595,14 @@ public class ResourceTrackerService extends AbstractService implements
 
   private void setAppCollectorsMapToResponse(
       List<ApplicationId> runningApps, NodeHeartbeatResponse response) {
-    Map<ApplicationId, String> liveAppCollectorsMap = new
-        HashMap<ApplicationId, String>();
+    Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new
+        HashMap<>();
     Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
     // Set collectors for all running apps on this node.
     for (ApplicationId appId : runningApps) {
-      String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
-      if (appCollectorAddr != null) {
-        liveAppCollectorsMap.put(appId, appCollectorAddr);
+      AppCollectorData appCollectorData = rmApps.get(appId).getCollectorData();
+      if (appCollectorData != null) {
+        liveAppCollectorsMap.put(appId, appCollectorData);
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Collector for applicaton: " + appId +
@@ -609,32 +610,43 @@ public class ResourceTrackerService extends AbstractService implements
         }
       }
     }
-    response.setAppCollectorsMap(liveAppCollectorsMap);
+    response.setAppCollectors(liveAppCollectorsMap);
   }
 
   private void updateAppCollectorsMap(NodeHeartbeatRequest request) {
-    Map<ApplicationId, String> registeredCollectorsMap =
-        request.getRegisteredCollectors();
-    if (registeredCollectorsMap != null
-        && !registeredCollectorsMap.isEmpty()) {
+    Map<ApplicationId, AppCollectorData> registeringCollectorsMap =
+        request.getRegisteringCollectors();
+    if (registeringCollectorsMap != null
+        && !registeringCollectorsMap.isEmpty()) {
       Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
-      for (Map.Entry<ApplicationId, String> entry:
-          registeredCollectorsMap.entrySet()) {
+      for (Map.Entry<ApplicationId, AppCollectorData> entry:
+          registeringCollectorsMap.entrySet()) {
         ApplicationId appId = entry.getKey();
-        String collectorAddr = entry.getValue();
-        if (collectorAddr != null && !collectorAddr.isEmpty()) {
+        AppCollectorData collectorData = entry.getValue();
+        if (collectorData != null) {
+          if (!collectorData.isStamped()) {
+            // Stamp the collector if we have not done so
+            collectorData.setRMIdentifier(
+                ResourceManager.getClusterTimeStamp());
+            collectorData.setVersion(
+                timelineCollectorVersion.getAndIncrement());
+          }
           RMApp rmApp = rmApps.get(appId);
           if (rmApp == null) {
             LOG.warn("Cannot update collector info because application ID: " +
                 appId + " is not found in RMContext!");
           } else {
-            String previousCollectorAddr = rmApp.getCollectorAddr();
-            if (previousCollectorAddr == null
-                || !previousCollectorAddr.equals(collectorAddr)) {
-              // sending collector update event.
-              RMAppCollectorUpdateEvent event =
-                  new RMAppCollectorUpdateEvent(appId, collectorAddr);
-              rmContext.getDispatcher().getEventHandler().handle(event);
+            AppCollectorData previousCollectorData = rmApp.getCollectorData();
+            if (AppCollectorData.happensBefore(previousCollectorData,
+                collectorData)) {
+              // Sending collector update event.
+              // Note: RM has to store the newly received collector data
+              // synchronously. Otherwise, the RM may send out stale collector
+              // data before this update is done, and the RM then crashes, the
+              // newly updated collector data will get lost.
+              LOG.info("Update collector information for application " + appId
+                  + " with new address: " + collectorData.getCollectorAddr());
+              ((RMAppImpl) rmApp).setCollectorData(collectorData);
             }
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org