You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2017/08/11 07:09:47 UTC

[1/2] hadoop git commit: YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-5355_branch2 e2ffa0f51 -> 798069390


http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 3234d6f..f826631 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -239,6 +240,11 @@ public abstract class MockAsm extends MockApps {
     public boolean isAppInCompletedStates() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+
+    @Override
+    public CollectorInfo getCollectorInfo() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
   }
 
   public static RMApp newApplication(int i) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 9365e54..17cafef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -321,13 +322,13 @@ public class MockRMApp implements RMApp {
     return false;
   }
 
-  public String getCollectorAddr() {
+  @Override
+  public AppCollectorData getCollectorData() {
     throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
-  public AppCollectorData getCollectorData() {
+  public CollectorInfo getCollectorInfo() {
     throw new UnsupportedOperationException("Not supported yet.");
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 07058f6..eb4381d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -80,7 +80,7 @@ public class TestTimelineServiceClientIntegration {
       auxService =
           PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
               collectorManager, conf);
-      auxService.addApplication(ApplicationId.newInstance(0, 1));
+      auxService.addApplication(ApplicationId.newInstance(0, 1), "user");
     } catch (ExitUtil.ExitException e) {
       fail();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
index da76958..84d892d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
@@ -23,7 +23,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.BufferedReader;
@@ -40,6 +43,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -51,10 +55,12 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
 import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
@@ -76,7 +82,6 @@ public class TestTimelineAuthFilterForV2 {
 
   private static final String FOO_USER = "foo";
   private static final String HTTP_USER = "HTTP";
-
   private static final File TEST_ROOT_DIR = new File(
       System.getProperty("test.build.dir", "target" + File.separator +
           "test-dir"), UUID.randomUUID().toString());
@@ -88,21 +93,35 @@ public class TestTimelineAuthFilterForV2 {
   private static String httpSpnegoPrincipal = KerberosTestUtils.
       getServerPrincipal();
 
+  // First param indicates whether HTTPS access or HTTP access and second param
+  // indicates whether it is kerberos access or token based access.
   @Parameterized.Parameters
-  public static Collection<Object[]> withSsl() {
-    return Arrays.asList(new Object[][] {{false}, {true}});
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][] {{false, true}, {false, false},
+        {true, false}, {true, true}});
   }
 
   private static MiniKdc testMiniKDC;
   private static String keystoresDir;
   private static String sslConfDir;
   private static Configuration conf;
+  private static UserGroupInformation nonKerberosUser;
+  static {
+    try {
+      nonKerberosUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {}
+  }
+  // Indicates whether HTTPS or HTTP access.
   private boolean withSsl;
+  // Indicates whether Kerberos based login is used or token based access is
+  // done.
+  private boolean withKerberosLogin;
   private NodeTimelineCollectorManager collectorManager;
   private PerNodeTimelineCollectorsAuxService auxService;
-
-  public TestTimelineAuthFilterForV2(boolean withSsl) {
+  public TestTimelineAuthFilterForV2(boolean withSsl,
+      boolean withKerberosLogin) {
     this.withSsl = withSsl;
+    this.withKerberosLogin = withKerberosLogin;
   }
 
   @BeforeClass
@@ -143,8 +162,6 @@ public class TestTimelineAuthFilterForV2 {
       conf.set("hadoop.proxyuser.HTTP.hosts", "*");
       conf.set("hadoop.proxyuser.HTTP.users", FOO_USER);
       UserGroupInformation.setConfiguration(conf);
-      SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
-          YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost");
     } catch (Exception e) {
       fail("Couldn't setup TimelineServer V2.");
     }
@@ -166,9 +183,27 @@ public class TestTimelineAuthFilterForV2 {
       conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
           HttpConfig.Policy.HTTP_ONLY.name());
     }
+    UserGroupInformation.setConfiguration(conf);
     collectorManager = new DummyNodeTimelineCollectorManager();
-    auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
-        collectorManager, conf);
+    auxService = PerNodeTimelineCollectorsAuxService.launchServer(
+        new String[0], collectorManager, conf);
+    if (withKerberosLogin) {
+      SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
+          YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost");
+    }
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    auxService.addApplication(
+        appId, UserGroupInformation.getCurrentUser().getUserName());
+    if (!withKerberosLogin) {
+      AppLevelTimelineCollector collector =
+          (AppLevelTimelineCollector)collectorManager.get(appId);
+      org.apache.hadoop.security.token.Token
+          <TimelineDelegationTokenIdentifier> token =
+              collector.getDelegationTokenForApp();
+      token.setService(new Text("localhost" + token.getService().toString().
+          substring(token.getService().toString().indexOf(":"))));
+      UserGroupInformation.getCurrentUser().addToken(token);
+    }
   }
 
   private TimelineV2Client createTimelineClientForUGI(ApplicationId appId) {
@@ -199,9 +234,14 @@ public class TestTimelineAuthFilterForV2 {
     }
     if (withSsl) {
       KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
-      File base = new File(BASEDIR);
-      FileUtil.fullyDelete(base);
+      FileUtil.fullyDelete(new File(BASEDIR));
+    }
+    if (withKerberosLogin) {
+      UserGroupInformation.getCurrentUser().logoutUserFromKeytab();
     }
+    // Reset the user for next run.
+    UserGroupInformation.setLoginUser(
+        UserGroupInformation.createRemoteUser(nonKerberosUser.getUserName()));
   }
 
   private static TimelineEntity createEntity(String id, String type) {
@@ -241,35 +281,44 @@ public class TestTimelineAuthFilterForV2 {
     }
   }
 
+  private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir,
+      String entityType) throws Exception {
+    TimelineV2Client client = createTimelineClientForUGI(appId);
+    try {
+    // Sync call. Results available immediately.
+      client.putEntities(createEntity("entity1", entityType));
+      assertEquals(1, entityTypeDir.listFiles().length);
+      verifyEntity(entityTypeDir, "entity1", entityType);
+      // Async call.
+      client.putEntitiesAsync(createEntity("entity2", entityType));
+    } finally {
+      client.stop();
+    }
+  }
+
   @Test
   public void testPutTimelineEntities() throws Exception {
-    final ApplicationId appId = ApplicationId.newInstance(0, 1);
-    auxService.addApplication(appId);
     final String entityType = "dummy_type";
+    final ApplicationId appId = ApplicationId.newInstance(0, 1);
     final File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() +
         File.separator + "entities" + File.separator +
-        YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + "test_user" +
+        YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator +
+        UserGroupInformation.getCurrentUser().getUserName() +
         File.separator + "test_flow_name" + File.separator +
         "test_flow_version" + File.separator + "1" + File.separator +
         appId.toString() + File.separator + entityType);
     try {
-      KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
-        @Override
-        public Void call() throws Exception {
-          TimelineV2Client client = createTimelineClientForUGI(appId);
-          try {
-            // Sync call. Results available immediately.
-            client.putEntities(createEntity("entity1", entityType));
-            assertEquals(1, entityTypeDir.listFiles().length);
-            verifyEntity(entityTypeDir, "entity1", entityType);
-            // Async call.
-            client.putEntitiesAsync(createEntity("entity2", entityType));
+      if (withKerberosLogin) {
+        KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            publishAndVerifyEntity(appId, entityTypeDir, entityType);
             return null;
-          } finally {
-            client.stop();
           }
-        }
-      });
+        });
+      } else {
+        publishAndVerifyEntity(appId, entityTypeDir, entityType);
+      }
       // Wait for async entity to be published.
       for (int i = 0; i < 50; i++) {
         if (entityTypeDir.listFiles().length == 2) {
@@ -279,6 +328,11 @@ public class TestTimelineAuthFilterForV2 {
       }
       assertEquals(2, entityTypeDir.listFiles().length);
       verifyEntity(entityTypeDir, "entity2", entityType);
+      AppLevelTimelineCollector collector =
+          (AppLevelTimelineCollector)collectorManager.get(appId);
+      auxService.removeApplication(appId);
+      verify(collectorManager.getTokenManagerService()).cancelToken(
+          eq(collector.getDelegationTokenForApp()), any(String.class));
     } finally {
       FileUtils.deleteQuietly(entityTypeDir);
     }
@@ -291,12 +345,19 @@ public class TestTimelineAuthFilterForV2 {
     }
 
     @Override
+    protected TimelineV2DelegationTokenSecretManagerService
+        createTokenManagerService() {
+      return spy(new TimelineV2DelegationTokenSecretManagerService());
+    }
+
+    @Override
     protected CollectorNodemanagerProtocol getNMCollectorService() {
       CollectorNodemanagerProtocol protocol =
           mock(CollectorNodemanagerProtocol.class);
       try {
         GetTimelineCollectorContextResponse response =
-            GetTimelineCollectorContextResponse.newInstance("test_user",
+            GetTimelineCollectorContextResponse.newInstance(
+                UserGroupInformation.getCurrentUser().getUserName(),
                 "test_flow_name", "test_flow_version", 1L);
         when(protocol.getTimelineCollectorContext(any(
             GetTimelineCollectorContextRequest.class))).thenReturn(response);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index c481dbe..13426b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -24,9 +24,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -41,13 +44,20 @@ public class AppLevelTimelineCollector extends TimelineCollector {
   private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
 
   private final ApplicationId appId;
+  private final String appUser;
   private final TimelineCollectorContext context;
   private UserGroupInformation currentUser;
+  private Token<TimelineDelegationTokenIdentifier> delegationTokenForApp;
 
   public AppLevelTimelineCollector(ApplicationId appId) {
+    this(appId, null);
+  }
+
+  public AppLevelTimelineCollector(ApplicationId appId, String user) {
     super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
     Preconditions.checkNotNull(appId, "AppId shouldn't be null");
     this.appId = appId;
+    this.appUser = user;
     context = new TimelineCollectorContext();
   }
 
@@ -55,6 +65,20 @@ public class AppLevelTimelineCollector extends TimelineCollector {
     return currentUser;
   }
 
+  public String getAppUser() {
+    return appUser;
+  }
+
+  void setDelegationTokenForApp(
+      Token<TimelineDelegationTokenIdentifier> token) {
+    this.delegationTokenForApp = token;
+  }
+
+  @VisibleForTesting
+  public Token<TimelineDelegationTokenIdentifier> getDelegationTokenForApp() {
+    return this.delegationTokenForApp;
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java
index ac91275..6c0d693 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java
@@ -56,8 +56,8 @@ public class AppLevelTimelineCollectorWithAgg
   private ScheduledThreadPoolExecutor appAggregationExecutor;
   private AppLevelAggregator appAggregator;
 
-  public AppLevelTimelineCollectorWithAgg(ApplicationId appId) {
-    super(appId);
+  public AppLevelTimelineCollectorWithAgg(ApplicationId appId, String user) {
+    super(appId, user);
   }
 
   private static Set<String> initializeSkipSet() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
index 50ebb0f..cad993d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
@@ -30,14 +30,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
@@ -71,6 +74,8 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
 
   private final boolean runningAsAuxService;
 
+  private UserGroupInformation loginUGI;
+
   static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
 
   @VisibleForTesting
@@ -85,25 +90,40 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    tokenMgrService = new TimelineV2DelegationTokenSecretManagerService();
+    tokenMgrService = createTokenManagerService();
     addService(tokenMgrService);
+    this.loginUGI = UserGroupInformation.getCurrentUser();
     super.serviceInit(conf);
   }
 
   @Override
   protected void serviceStart() throws Exception {
-    if (UserGroupInformation.isSecurityEnabled() && !runningAsAuxService) {
+    if (UserGroupInformation.isSecurityEnabled()) {
       // Do security login for cases where collector is running outside NM.
-      try {
-        doSecureLogin();
-      } catch(IOException ie) {
-        throw new YarnRuntimeException("Failed to login", ie);
+      if (!runningAsAuxService) {
+        try {
+          doSecureLogin();
+        } catch(IOException ie) {
+          throw new YarnRuntimeException("Failed to login", ie);
+        }
       }
+      this.loginUGI = UserGroupInformation.getLoginUser();
     }
     super.serviceStart();
     startWebApp();
   }
 
+  protected TimelineV2DelegationTokenSecretManagerService
+      createTokenManagerService() {
+    return new TimelineV2DelegationTokenSecretManagerService();
+  }
+
+  @VisibleForTesting
+  public TimelineV2DelegationTokenSecretManagerService
+      getTokenManagerService() {
+    return tokenMgrService;
+  }
+
   private void doSecureLogin() throws IOException {
     Configuration conf = getConfig();
     InetSocketAddress addr = NetUtils.createSocketAddr(conf.getTrimmed(
@@ -122,13 +142,45 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
     super.serviceStop();
   }
 
+  @VisibleForTesting
+  public Token<TimelineDelegationTokenIdentifier> generateTokenForAppCollector(
+      String user) {
+    Token<TimelineDelegationTokenIdentifier> token  = tokenMgrService.
+        generateToken(UserGroupInformation.createRemoteUser(user),
+            loginUGI.getShortUserName());
+    token.setService(new Text(timelineRestServerBindAddress));
+    return token;
+  }
+
+  @VisibleForTesting
+  public void cancelTokenForAppCollector(
+      AppLevelTimelineCollector appCollector) throws IOException {
+    if (appCollector.getDelegationTokenForApp() != null) {
+      tokenMgrService.cancelToken(appCollector.getDelegationTokenForApp(),
+          appCollector.getAppUser());
+    }
+  }
+
   @Override
   protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
     try {
       // Get context info from NM
       updateTimelineCollectorContext(appId, collector);
+      // Generate token for app collector.
+      org.apache.hadoop.yarn.api.records.Token token = null;
+      if (UserGroupInformation.isSecurityEnabled() &&
+          collector instanceof AppLevelTimelineCollector) {
+        AppLevelTimelineCollector appCollector =
+            (AppLevelTimelineCollector)collector;
+        Token<TimelineDelegationTokenIdentifier> timelineToken =
+            generateTokenForAppCollector(appCollector.getAppUser());
+        appCollector.setDelegationTokenForApp(timelineToken);
+        token = org.apache.hadoop.yarn.api.records.Token.newInstance(
+            timelineToken.getIdentifier(), timelineToken.getKind().toString(),
+            timelineToken.getPassword(), timelineToken.getService().toString());
+      }
       // Report to NM if a new collector is added.
-      reportNewCollectorToNM(appId);
+      reportNewCollectorToNM(appId, token);
     } catch (YarnException | IOException e) {
       // throw exception here as it cannot be used if failed communicate with NM
       LOG.error("Failed to communicate with NM Collector Service for " + appId);
@@ -136,6 +188,18 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
     }
   }
 
+  @Override
+  protected void postRemove(ApplicationId appId, TimelineCollector collector) {
+    if (collector instanceof AppLevelTimelineCollector) {
+      try {
+        cancelTokenForAppCollector((AppLevelTimelineCollector)collector);
+      } catch (IOException e) {
+        LOG.warn("Failed to cancel token for app collector with appId " +
+            appId, e);
+      }
+    }
+  }
+
   /**
    * Launch the REST web server for this collector manager.
    */
@@ -180,11 +244,12 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
         timelineRestServerBindAddress);
   }
 
-  private void reportNewCollectorToNM(ApplicationId appId)
+  private void reportNewCollectorToNM(ApplicationId appId,
+      org.apache.hadoop.yarn.api.records.Token token)
       throws YarnException, IOException {
     ReportNewCollectorInfoRequest request =
         ReportNewCollectorInfoRequest.newInstance(appId,
-            this.timelineRestServerBindAddress);
+            this.timelineRestServerBindAddress, token);
     LOG.info("Report a new collector for application: " + appId +
         " to the NM Collector Service.");
     getNMCollectorService().reportNewCollectorInfo(request);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
index 669e752..cb48e72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -114,11 +114,12 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
    * exists, no new service is created.
    *
    * @param appId Application Id to be added.
+   * @param user Application Master container user.
    * @return whether it was added successfully
    */
-  public boolean addApplication(ApplicationId appId) {
+  public boolean addApplication(ApplicationId appId, String user) {
     AppLevelTimelineCollector collector =
-        new AppLevelTimelineCollectorWithAgg(appId);
+        new AppLevelTimelineCollectorWithAgg(appId, user);
     return (collectorManager.putIfAbsent(appId, collector)
         == collector);
   }
@@ -147,7 +148,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
     if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
       ApplicationId appId = context.getContainerId().
           getApplicationAttemptId().getApplicationId();
-      addApplication(appId);
+      addApplication(appId, context.getUser());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java
index eef8436..de7db58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java
@@ -18,8 +18,13 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.security;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService;
@@ -43,6 +48,17 @@ public class TimelineV2DelegationTokenSecretManagerService extends
         tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval);
   }
 
+  public Token<TimelineDelegationTokenIdentifier> generateToken(
+      UserGroupInformation ugi, String renewer) {
+    return ((TimelineV2DelegationTokenSecretManager)
+        getTimelineDelegationTokenSecretManager()).generateToken(ugi, renewer);
+  }
+
+  public void cancelToken(Token<TimelineDelegationTokenIdentifier> token,
+      String canceller) throws IOException {
+    getTimelineDelegationTokenSecretManager().cancelToken(token, canceller);
+  }
+
   /**
    * Delegation token secret manager for ATSv2.
    */
@@ -70,6 +86,21 @@ public class TimelineV2DelegationTokenSecretManagerService extends
           delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
     }
 
+    public Token<TimelineDelegationTokenIdentifier> generateToken(
+        UserGroupInformation ugi, String renewer) {
+      Text realUser = null;
+      if (ugi.getRealUser() != null) {
+        realUser = new Text(ugi.getRealUser().getUserName());
+      }
+      TimelineDelegationTokenIdentifier identifier = createIdentifier();
+      identifier.setOwner(new Text(ugi.getUserName()));
+      identifier.setRenewer(new Text(renewer));
+      identifier.setRealUser(realUser);
+      byte[] password = createPassword(identifier);
+      return new Token<TimelineDelegationTokenIdentifier>(identifier.getBytes(),
+          password, identifier.getKind(), null);
+    }
+
     @Override
     public TimelineDelegationTokenIdentifier createIdentifier() {
       return new TimelineDelegationTokenIdentifier();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
index a59f8c1..af9acce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
@@ -95,7 +95,7 @@ public class TestNMTimelineCollectorManager {
       Callable<Boolean> task = new Callable<Boolean>() {
         public Boolean call() {
           AppLevelTimelineCollector collector =
-              new AppLevelTimelineCollectorWithAgg(appId);
+              new AppLevelTimelineCollectorWithAgg(appId, "user");
           return (collectorManager.putIfAbsent(appId, collector) == collector);
         }
       };
@@ -126,7 +126,7 @@ public class TestNMTimelineCollectorManager {
       Callable<Boolean> task = new Callable<Boolean>() {
         public Boolean call() {
           AppLevelTimelineCollector collector =
-              new AppLevelTimelineCollectorWithAgg(appId);
+              new AppLevelTimelineCollectorWithAgg(appId, "user");
           boolean successPut =
               (collectorManager.putIfAbsent(appId, collector) == collector);
           return successPut && collectorManager.remove(appId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena.

Posted by ro...@apache.org.
YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79806939
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79806939
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79806939

Branch: refs/heads/YARN-5355_branch2
Commit: 798069390739ab6971ca038aad4cd0adc4b9855a
Parents: e2ffa0f
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Fri Aug 11 12:35:35 2017 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Fri Aug 11 12:35:35 2017 +0530

----------------------------------------------------------------------
 .../v2/app/rm/RMContainerAllocator.java         |   9 +-
 .../app/local/TestLocalContainerAllocator.java  |   2 +-
 .../api/protocolrecords/AllocateResponse.java   |  92 +++++++++---
 .../hadoop/yarn/api/records/CollectorInfo.java  |  55 +++++++
 .../src/main/proto/yarn_protos.proto            |   5 +
 .../src/main/proto/yarn_service_protos.proto    |   2 +-
 .../api/async/impl/AMRMClientAsyncImpl.java     |   6 +-
 .../ApplicationMasterServiceProtoTestBase.java  |  72 +++++++++
 .../hadoop/yarn/client/ProtocolHATestBase.java  |  22 ++-
 ...ationMasterServiceProtocolForTimelineV2.java |  71 +++++++++
 ...estApplicationMasterServiceProtocolOnHA.java |  46 +-----
 .../api/async/impl/TestAMRMClientAsync.java     |   4 +-
 .../impl/pb/AllocateResponsePBImpl.java         |  37 ++++-
 .../records/impl/pb/CollectorInfoPBImpl.java    | 148 +++++++++++++++++++
 .../hadoop/yarn/api/TestPBImplRecords.java      |   2 +
 .../ReportNewCollectorInfoRequest.java          |   5 +-
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     |  25 +++-
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  21 ++-
 .../pb/ReportNewCollectorInfoRequestPBImpl.java |   4 +-
 .../server/api/records/AppCollectorData.java    |  27 +++-
 .../records/impl/pb/AppCollectorDataPBImpl.java |  29 +++-
 .../yarn_server_common_service_protos.proto     |   2 +
 .../java/org/apache/hadoop/yarn/TestRPC.java    |  30 +++-
 .../hadoop/yarn/TestYarnServerApiClasses.java   |   4 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      |   1 -
 .../application/ApplicationImpl.java            |   2 +-
 .../amrmproxy/MockResourceManagerFacade.java    |   2 +-
 .../ApplicationMasterService.java               |  10 +-
 .../server/resourcemanager/rmapp/RMApp.java     |  15 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |  10 +-
 .../applicationsmanager/MockAsm.java            |   6 +
 .../server/resourcemanager/rmapp/MockRMApp.java |   7 +-
 .../TestTimelineServiceClientIntegration.java   |   2 +-
 .../security/TestTimelineAuthFilterForV2.java   | 121 +++++++++++----
 .../collector/AppLevelTimelineCollector.java    |  24 +++
 .../AppLevelTimelineCollectorWithAgg.java       |   4 +-
 .../collector/NodeTimelineCollectorManager.java |  83 +++++++++--
 .../PerNodeTimelineCollectorsAuxService.java    |   7 +-
 ...neV2DelegationTokenSecretManagerService.java |  31 ++++
 .../TestNMTimelineCollectorManager.java         |   4 +-
 40 files changed, 887 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 218e218..d681940 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -860,13 +860,16 @@ public class RMContainerAllocator extends RMContainerRequestor
     handleUpdatedNodes(response);
     handleJobPriorityChange(response);
     // handle receiving the timeline collector address for this app
-    String collectorAddr = response.getCollectorAddr();
+    String collectorAddr = null;
+    if (response.getCollectorInfo() != null) {
+      collectorAddr = response.getCollectorInfo().getCollectorAddr();
+    }
+
     MRAppMaster.RunningAppContext appContext =
         (MRAppMaster.RunningAppContext)this.getContext();
     if (collectorAddr != null && !collectorAddr.isEmpty()
         && appContext.getTimelineV2Client() != null) {
-      appContext.getTimelineV2Client().setTimelineServiceAddress(
-          response.getCollectorAddr());
+      appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr);
     }
 
     for (ContainerStatus cont : finishedContainers) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
index 3fa0043..9549b68 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
@@ -297,7 +297,7 @@ public class TestLocalContainerAllocator {
           Resources.none(), null, 1, null,
           Collections.<NMToken>emptyList(),
           yarnToken,
-          Collections.<UpdatedContainer>emptyList());
+          Collections.<UpdatedContainer>emptyList(), null);
       response.setApplicationPriority(Priority.newInstance(0));
       return response;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index c369c3c..b2b40a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
@@ -96,7 +97,8 @@ public abstract class AllocateResponse {
 
   /**
    * Use {@link AllocateResponse#newInstance(int, List, List, List, Resource,
-   * AMCommand, int, PreemptionMessage, List, Token, List)} instead
+   * AMCommand, int, PreemptionMessage, List, Token, List, CollectorInfo)}
+   * instead.
    * @param responseId responseId
    * @param completedContainers completedContainers
    * @param allocatedContainers allocatedContainers
@@ -117,10 +119,14 @@ public abstract class AllocateResponse {
       Resource availResources, AMCommand command, int numClusterNodes,
       PreemptionMessage preempt, List<NMToken> nmTokens,
       List<ContainerResourceIncrease> increasedContainers,
-      List<ContainerResourceDecrease> decreasedContainers) {
-    return newInstance(responseId, completedContainers, allocatedContainers,
-        updatedNodes, availResources, command, numClusterNodes, preempt,
-        nmTokens);
+      List<ContainerResourceDecrease> decreasedContainers,
+      CollectorInfo collectorInfo) {
+    return AllocateResponse.newBuilder().responseId(responseId)
+        .completedContainersStatuses(completedContainers)
+        .allocatedContainers(allocatedContainers)
+        .updatedNodes(updatedNodes).availableResources(availResources)
+        .amCommand(command).nmTokens(nmTokens).collectorInfo(collectorInfo)
+        .build();
   }
 
   @Public
@@ -147,14 +153,15 @@ public abstract class AllocateResponse {
       List<Container> allocatedContainers, List<NodeReport> updatedNodes,
       Resource availResources, AMCommand command, int numClusterNodes,
       PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
-      List<UpdatedContainer> updatedContainers) {
+      List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo) {
     return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
         .responseId(responseId)
         .completedContainersStatuses(completedContainers)
         .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
         .availableResources(availResources).amCommand(command)
         .preemptionMessage(preempt).nmTokens(nmTokens)
-        .updatedContainers(updatedContainers).amRmToken(amRMToken).build();
+        .updatedContainers(updatedContainers).amRmToken(amRMToken)
+        .collectorInfo(collectorInfo).build();
   }
 
   /**
@@ -346,6 +353,20 @@ public abstract class AllocateResponse {
   public abstract void setApplicationPriority(Priority priority);
 
   /**
+   * The data associated with the collector that belongs to this app. Contains
+   * address and token alongwith identification information.
+   *
+   * @return The data of collector that belong to this attempt
+   */
+  @Public
+  @Unstable
+  public abstract CollectorInfo getCollectorInfo();
+
+  @Private
+  @Unstable
+  public abstract void setCollectorInfo(CollectorInfo info);
+
+  /**
    * Get the list of container update errors to inform the
    * Application Master about the container updates that could not be
    * satisfied due to error.
@@ -544,6 +565,50 @@ public abstract class AllocateResponse {
     }
 
     /**
+     * Set the <code>applicationPriority</code> of the response.
+     * @see AllocateResponse#setApplicationPriority(Priority)
+     * @param applicationPriority
+     *     <code>applicationPriority</code> of the response
+     * @return {@link AllocateResponseBuilder}
+     */
+    @Private
+    @Unstable
+    public AllocateResponseBuilder applicationPriority(
+        Priority applicationPriority) {
+      allocateResponse.setApplicationPriority(applicationPriority);
+      return this;
+    }
+
+    /**
+     * Set the <code>collectorInfo</code> of the response.
+     * @see AllocateResponse#setCollectorInfo(CollectorInfo)
+     * @param collectorInfo <code>collectorInfo</code> of the response which
+     *    contains collector address, RM id, version and collector token.
+     * @return {@link AllocateResponseBuilder}
+     */
+    @Private
+    @Unstable
+    public AllocateResponseBuilder collectorInfo(
+        CollectorInfo collectorInfo) {
+      allocateResponse.setCollectorInfo(collectorInfo);
+      return this;
+    }
+
+    /**
+     * Set the <code>updateErrors</code> of the response.
+     * @see AllocateResponse#setUpdateErrors(List)
+     * @param updateErrors <code>updateErrors</code> of the response
+     * @return {@link AllocateResponseBuilder}
+     */
+    @Private
+    @Unstable
+    public AllocateResponseBuilder updateErrors(
+        List<UpdateContainerError> updateErrors) {
+      allocateResponse.setUpdateErrors(updateErrors);
+      return this;
+    }
+
+    /**
      * Return generated {@link AllocateResponse} object.
      * @return {@link AllocateResponse}
      */
@@ -568,17 +633,4 @@ public abstract class AllocateResponse {
   @Deprecated
   public abstract List<ContainerResourceDecrease> getDecreasedContainers();
 
-  /**
-   * The address of collector that belong to this app
-   *
-   * @return The address of collector that belong to this attempt
-   */
-  @Public
-  @Unstable
-  public abstract String getCollectorAddr();
-
-  @Private
-  @Unstable
-  public abstract void setCollectorAddr(String collectorAddr);
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
new file mode 100644
index 0000000..960c992
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Collector info containing collector address and collector token passed from
+ * RM to AM in Allocate Response.
+ */
+@Private
+@InterfaceStability.Unstable
+public abstract class CollectorInfo {
+
+  protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
+
+  public static CollectorInfo newInstance(String collectorAddr, Token token) {
+    CollectorInfo amCollectorInfo =
+        Records.newRecord(CollectorInfo.class);
+    amCollectorInfo.setCollectorAddr(collectorAddr);
+    amCollectorInfo.setCollectorToken(token);
+    return amCollectorInfo;
+  }
+
+  public abstract String getCollectorAddr();
+
+  public abstract void setCollectorAddr(String addr);
+
+  /**
+   * Get delegation token for app collector which AM will use to publish
+   * entities.
+   * @return the delegation token for app collector.
+   */
+  public abstract Token getCollectorToken();
+
+  public abstract void setCollectorToken(Token token);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 4af5a97..59d9141 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -635,3 +635,8 @@ message ContainerResourceDecreaseProto {
   optional ContainerIdProto container_id = 1;
   optional ResourceProto capability = 2;
 }
+
+message CollectorInfoProto {
+  optional string collector_addr = 1;
+  optional hadoop.common.TokenProto collector_token = 2;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 2a668bd..a212a7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -114,7 +114,7 @@ message AllocateResponseProto {
   repeated ContainerResourceDecreaseProto decreased_containers = 11;
   optional hadoop.common.TokenProto am_rm_token = 12;
   optional PriorityProto application_priority = 13;
-  optional string collector_addr = 14;
+  optional CollectorInfoProto collector_info = 14;
   repeated UpdateContainerErrorProto update_errors = 15;
   repeated UpdatedContainerProto updated_containers = 16;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 8b2557c..eb94b28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -327,7 +327,11 @@ extends AMRMClientAsync<T> {
 
           AllocateResponse response = (AllocateResponse) object;
 
-          String collectorAddress = response.getCollectorAddr();
+          String collectorAddress = null;
+          if (response.getCollectorInfo() != null) {
+            collectorAddress = response.getCollectorInfo().getCollectorAddr();
+          }
+
           TimelineV2Client timelineClient =
               client.getRegisteredTimelineV2Client();
           if (timelineClient != null && collectorAddress != null

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java
new file mode 100644
index 0000000..4521018
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.junit.After;
+
+/**
+ * Test Base for Application Master Service Protocol.
+ */
+public abstract class ApplicationMasterServiceProtoTestBase
+    extends ProtocolHATestBase {
+
+  private ApplicationMasterProtocol amClient;
+  private ApplicationAttemptId attemptId;
+
+  protected void startupHAAndSetupClient() throws Exception {
+    attemptId = this.cluster.createFakeApplicationAttemptId();
+
+    Token<AMRMTokenIdentifier> appToken =
+        this.cluster.getResourceManager().getRMContext()
+          .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
+    appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(appToken);
+    syncToken(appToken);
+    amClient = ClientRMProxy
+        .createRMProxy(this.conf, ApplicationMasterProtocol.class);
+  }
+
+  @After
+  public void shutDown() {
+    if(this.amClient != null) {
+      RPC.stopProxy(this.amClient);
+    }
+  }
+
+  protected ApplicationMasterProtocol getAMClient() {
+    return amClient;
+  }
+
+  private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
+    for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
+      this.cluster.getResourceManager(i).getRMContext()
+          .getAMRMTokenSecretManager().addPersistedPassword(token);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index a8e9132..efb1987 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -98,6 +99,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -804,11 +806,21 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     }
 
     public AllocateResponse createFakeAllocateResponse() {
-      return AllocateResponse.newInstance(-1,
-          new ArrayList<ContainerStatus>(),
-          new ArrayList<Container>(), new ArrayList<NodeReport>(),
-          Resource.newInstance(1024, 2), null, 1,
-          null, new ArrayList<NMToken>());
+      if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+        return AllocateResponse.newInstance(-1,
+            new ArrayList<ContainerStatus>(), new ArrayList<Container>(),
+            new ArrayList<NodeReport>(), Resource.newInstance(1024, 2), null, 1,
+            null, new ArrayList<NMToken>(), null,
+            new ArrayList<UpdatedContainer>(),
+            CollectorInfo.newInstance("host:port", Token.newInstance(
+            new byte[] {0}, "TIMELINE", new byte[] {0}, "rm")));
+      } else {
+        return AllocateResponse.newInstance(-1,
+            new ArrayList<ContainerStatus>(),
+            new ArrayList<Container>(), new ArrayList<NodeReport>(),
+            Resource.newInstance(1024, 2), null, 1,
+            null, new ArrayList<NMToken>());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java
new file mode 100644
index 0000000..be8c302
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java
@@ -0,0 +1,71 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests Application Master Protocol with timeline service v2 enabled.
+ */
+public class TestApplicationMasterServiceProtocolForTimelineV2
+    extends ApplicationMasterServiceProtoTestBase {
+
+  @Before
+  public void initialize() throws Exception {
+    HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE + 200, conf);
+    HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE + 200, conf);
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+    startHACluster(0, false, false, true);
+    super.startupHAAndSetupClient();
+  }
+
+  @Test(timeout = 15000)
+  public void testAllocateForTimelineV2OnHA()
+      throws YarnException, IOException {
+    AllocateRequest request = AllocateRequest.newInstance(0, 50f,
+        new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>(),
+        ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
+            new ArrayList<String>()));
+    AllocateResponse response = getAMClient().allocate(request);
+    Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
+    Assert.assertNotNull(response.getCollectorInfo());
+    Assert.assertEquals("host:port",
+        response.getCollectorInfo().getCollectorAddr());
+    Assert.assertNotNull(response.getCollectorInfo().getCollectorToken());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
index ad86fb3..c2f39a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
@@ -23,10 +23,6 @@ import java.util.ArrayList;
 
 import org.junit.Assert;
 
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -34,45 +30,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 
 public class TestApplicationMasterServiceProtocolOnHA
-    extends ProtocolHATestBase {
-  private ApplicationMasterProtocol amClient;
-  private ApplicationAttemptId attemptId ;
-
+    extends ApplicationMasterServiceProtoTestBase {
   @Before
   public void initialize() throws Exception {
     startHACluster(0, false, false, true);
-    attemptId = this.cluster.createFakeApplicationAttemptId();
-
-    Token<AMRMTokenIdentifier> appToken =
-        this.cluster.getResourceManager().getRMContext()
-          .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
-    appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
-    UserGroupInformation.setLoginUser(UserGroupInformation
-        .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-    UserGroupInformation.getCurrentUser().addToken(appToken);
-    syncToken(appToken);
-
-    amClient = ClientRMProxy
-        .createRMProxy(this.conf, ApplicationMasterProtocol.class);
-  }
-
-  @After
-  public void shutDown() {
-    if(this.amClient != null) {
-      RPC.stopProxy(this.amClient);
-    }
+    super.startupHAAndSetupClient();
   }
 
   @Test(timeout = 15000)
@@ -81,7 +52,7 @@ public class TestApplicationMasterServiceProtocolOnHA
     RegisterApplicationMasterRequest request =
         RegisterApplicationMasterRequest.newInstance("localhost", 0, "");
     RegisterApplicationMasterResponse response =
-        amClient.registerApplicationMaster(request);
+        getAMClient().registerApplicationMaster(request);
     Assert.assertEquals(response,
         this.cluster.createFakeRegisterApplicationMasterResponse());
   }
@@ -93,7 +64,7 @@ public class TestApplicationMasterServiceProtocolOnHA
         FinishApplicationMasterRequest.newInstance(
             FinalApplicationStatus.SUCCEEDED, "", "");
     FinishApplicationMasterResponse response =
-        amClient.finishApplicationMaster(request);
+        getAMClient().finishApplicationMaster(request);
     Assert.assertEquals(response,
         this.cluster.createFakeFinishApplicationMasterResponse());
   }
@@ -105,14 +76,7 @@ public class TestApplicationMasterServiceProtocolOnHA
         new ArrayList<ContainerId>(),
         ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
             new ArrayList<String>()));
-    AllocateResponse response = amClient.allocate(request);
+    AllocateResponse response = getAMClient().allocate(request);
     Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
   }
-
-  private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
-    for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
-      this.cluster.getResourceManager(i).getRMContext()
-          .getAMRMTokenSecretManager().addPersistedPassword(token);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
index ba38340..9c64412 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
@@ -426,8 +426,8 @@ public class TestAMRMClientAsync {
     }
     AllocateResponse response =
         AllocateResponse.newInstance(0, completed, allocated,
-            new ArrayList<NodeReport>(), null, null, 1, null, nmTokens,
-            updatedContainers);
+            new ArrayList<NodeReport>(), null, null, 1, null, nmTokens, null,
+            updatedContainers, null);
     return response;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index bd82016..61d16e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
@@ -82,6 +85,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
   private PreemptionMessage preempt;
   private Token amrmToken = null;
   private Priority appPriority = null;
+  private CollectorInfo collectorInfo = null;
 
   public AllocateResponsePBImpl() {
     builder = AllocateResponseProto.newBuilder();
@@ -164,6 +168,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
     if (this.amrmToken != null) {
       builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
     }
+    if (this.collectorInfo != null) {
+      builder.setCollectorInfo(convertToProtoFormat(this.collectorInfo));
+    }
     if (this.appPriority != null) {
       builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
     }
@@ -410,19 +417,25 @@ public class AllocateResponsePBImpl extends AllocateResponse {
 
 
   @Override
-  public synchronized String getCollectorAddr() {
+  public synchronized CollectorInfo getCollectorInfo() {
     AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
-    return p.getCollectorAddr();
+    if (this.collectorInfo != null) {
+      return this.collectorInfo;
+    }
+    if (!p.hasCollectorInfo()) {
+      return null;
+    }
+    this.collectorInfo = convertFromProtoFormat(p.getCollectorInfo());
+    return this.collectorInfo;
   }
 
   @Override
-  public synchronized void setCollectorAddr(String collectorAddr) {
+  public synchronized void setCollectorInfo(CollectorInfo info) {
     maybeInitBuilder();
-    if (collectorAddr == null) {
-      builder.clearCollectorAddr();
-      return;
+    if (info == null) {
+      builder.clearCollectorInfo();
     }
-    builder.setCollectorAddr(collectorAddr);
+    this.collectorInfo = info;
   }
 
   @Override
@@ -730,6 +743,16 @@ public class AllocateResponsePBImpl extends AllocateResponse {
     return ((NodeReportPBImpl)t).getProto();
   }
 
+  private synchronized CollectorInfoPBImpl convertFromProtoFormat(
+      CollectorInfoProto p) {
+    return new CollectorInfoPBImpl(p);
+  }
+
+  private synchronized CollectorInfoProto convertToProtoFormat(
+      CollectorInfo t) {
+    return ((CollectorInfoPBImpl)t).getProto();
+  }
+
   private synchronized ContainerPBImpl convertFromProtoFormat(
       ContainerProto p) {
     return new ContainerPBImpl(p);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java
new file mode 100644
index 0000000..bb54133
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Protocol record implementation of {@link CollectorInfo}.
+ */
+public class CollectorInfoPBImpl extends CollectorInfo {
+
+  private CollectorInfoProto proto = CollectorInfoProto.getDefaultInstance();
+
+  private CollectorInfoProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private String collectorAddr = null;
+  private Token collectorToken = null;
+
+
+  public CollectorInfoPBImpl() {
+    builder = CollectorInfoProto.newBuilder();
+  }
+
+  public CollectorInfoPBImpl(CollectorInfoProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public CollectorInfoProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = CollectorInfoProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  @Override
+  public String getCollectorAddr() {
+    CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.collectorAddr == null && p.hasCollectorAddr()) {
+      this.collectorAddr = p.getCollectorAddr();
+    }
+    return this.collectorAddr;
+  }
+
+  @Override
+  public void setCollectorAddr(String addr) {
+    maybeInitBuilder();
+    if (collectorAddr == null) {
+      builder.clearCollectorAddr();
+    }
+    this.collectorAddr = addr;
+  }
+
+  @Override
+  public Token getCollectorToken() {
+    CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.collectorToken == null && p.hasCollectorToken()) {
+      this.collectorToken = convertFromProtoFormat(p.getCollectorToken());
+    }
+    return this.collectorToken;
+  }
+
+  @Override
+  public void setCollectorToken(Token token) {
+    maybeInitBuilder();
+    if (token == null) {
+      builder.clearCollectorToken();
+    }
+    this.collectorToken = token;
+  }
+
+  private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+    return new TokenPBImpl(p);
+  }
+
+  private TokenProto convertToProtoFormat(Token t) {
+    return ((TokenPBImpl) t).getProto();
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.collectorAddr != null) {
+      builder.setCollectorAddr(this.collectorAddr);
+    }
+    if (this.collectorToken != null) {
+      builder.setCollectorToken(convertToProtoFormat(this.collectorToken));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index 932e078..bc657ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestP
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -405,6 +406,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
     generateByNewInstance(ApplicationTimeout.class);
     generateByNewInstance(ContainerResourceIncreaseRequest.class);
     generateByNewInstance(QueueConfigurations.class);
+    generateByNewInstance(CollectorInfo.class);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
index 1503eca..a4c1a38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -37,11 +38,11 @@ public abstract class ReportNewCollectorInfoRequest {
   }
 
   public static ReportNewCollectorInfoRequest newInstance(
-      ApplicationId id, String collectorAddr) {
+      ApplicationId id, String collectorAddr, Token token) {
     ReportNewCollectorInfoRequest request =
         Records.newRecord(ReportNewCollectorInfoRequest.class);
     request.setAppCollectorsList(
-        Arrays.asList(AppCollectorData.newInstance(id, collectorAddr)));
+        Arrays.asList(AppCollectorData.newInstance(id, collectorAddr, token)));
     return request;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 73a8abe..c07a6eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -26,23 +26,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -164,9 +167,13 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     builder.clearRegisteringCollectors();
     for (Map.Entry<ApplicationId, AppCollectorData> entry :
         registeringCollectors.entrySet()) {
+      AppCollectorData data = entry.getValue();
       builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder()
           .setAppId(convertToProtoFormat(entry.getKey()))
-          .setAppCollectorAddr(entry.getValue().getCollectorAddr()));
+          .setAppCollectorAddr(data.getCollectorAddr())
+          .setAppCollectorToken(convertToProtoFormat(data.getCollectorToken()))
+          .setRmIdentifier(data.getRMIdentifier())
+          .setVersion(data.getVersion()));
     }
   }
 
@@ -267,8 +274,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
       this.registeringCollectors = new HashMap<>();
       for (AppCollectorDataProto c : list) {
         ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
         AppCollectorData data = AppCollectorData.newInstance(appId,
-            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
+            collectorToken);
         this.registeringCollectors.put(appId, data);
       }
     }
@@ -309,6 +318,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     return ((MasterKeyPBImpl)t).getProto();
   }
 
+  private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+    return new TokenPBImpl(p);
+  }
+
+  private TokenProto convertToProtoFormat(Token t) {
+    return ((TokenPBImpl) t).getProto();
+  }
+
   @Override
   public Set<NodeLabel> getNodeLabels() {
     initNodeLabels();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index bc4e802..3f4b4ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -26,32 +26,35 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -153,6 +156,8 @@ public class NodeHeartbeatResponsePBImpl extends
       builder.addAppCollectors(AppCollectorDataProto.newBuilder()
           .setAppId(convertToProtoFormat(entry.getKey()))
           .setAppCollectorAddr(data.getCollectorAddr())
+          .setAppCollectorToken(
+              convertToProtoFormat(entry.getValue().getCollectorToken()))
           .setRmIdentifier(data.getRMIdentifier())
           .setVersion(data.getVersion()));
     }
@@ -598,8 +603,10 @@ public class NodeHeartbeatResponsePBImpl extends
       this.appCollectorsMap = new HashMap<>();
       for (AppCollectorDataProto c : list) {
         ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
         AppCollectorData data = AppCollectorData.newInstance(appId,
-            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
+            collectorToken);
         this.appCollectorsMap.put(appId, data);
       }
     }
@@ -779,5 +786,13 @@ public class NodeHeartbeatResponsePBImpl extends
       SignalContainerRequest t) {
     return ((SignalContainerRequestPBImpl)t).getProto();
   }
+
+  private TokenProto convertToProtoFormat(Token t) {
+    return ((TokenPBImpl) t).getProto();
+  }
+
+  private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+    return new TokenPBImpl(p);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
index 3f3dcf5..5ffc3a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
 
 public class ReportNewCollectorInfoRequestPBImpl extends
     ReportNewCollectorInfoRequest {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
index da2e5de..5266dca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.records;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.util.Records;
 
 
@@ -31,20 +32,32 @@ public abstract class AppCollectorData {
   protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
 
   public static AppCollectorData newInstance(
-      ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
+      ApplicationId id, String collectorAddr, long rmIdentifier, long version,
+      Token token) {
     AppCollectorData appCollectorData =
         Records.newRecord(AppCollectorData.class);
     appCollectorData.setApplicationId(id);
     appCollectorData.setCollectorAddr(collectorAddr);
     appCollectorData.setRMIdentifier(rmIdentifier);
     appCollectorData.setVersion(version);
+    appCollectorData.setCollectorToken(token);
     return appCollectorData;
   }
 
+  public static AppCollectorData newInstance(
+      ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
+    return newInstance(id, collectorAddr, rmIdentifier, version, null);
+  }
+
   public static AppCollectorData newInstance(ApplicationId id,
-      String collectorAddr) {
+      String collectorAddr, Token token) {
     return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
-        DEFAULT_TIMESTAMP_VALUE);
+        DEFAULT_TIMESTAMP_VALUE, token);
+  }
+
+  public static AppCollectorData newInstance(ApplicationId id,
+      String collectorAddr) {
+    return newInstance(id, collectorAddr, null);
   }
 
   /**
@@ -101,4 +114,12 @@ public abstract class AppCollectorData {
 
   public abstract void setVersion(long version);
 
+  /**
+   * Get delegation token for app collector which AM will use to publish
+   * entities.
+   * @return the delegation token for app collector.
+   */
+  public abstract Token getCollectorToken();
+
+  public abstract void setCollectorToken(Token token);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
index 7d3a805..7144f51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
@@ -19,10 +19,11 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
-
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder;
@@ -43,6 +44,7 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
   private String collectorAddr = null;
   private Long rmIdentifier = null;
   private Long version = null;
+  private Token collectorToken = null;
 
   public AppCollectorDataPBImpl() {
     builder = AppCollectorDataProto.newBuilder();
@@ -158,6 +160,24 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
     builder.setVersion(version);
   }
 
+  @Override
+  public Token getCollectorToken() {
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.collectorToken == null && p.hasAppCollectorToken()) {
+      this.collectorToken = new TokenPBImpl(p.getAppCollectorToken());
+    }
+    return this.collectorToken;
+  }
+
+  @Override
+  public void setCollectorToken(Token token) {
+    maybeInitBuilder();
+    if (token == null) {
+      builder.clearAppCollectorToken();
+    }
+    this.collectorToken = token;
+  }
+
   private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
     return new ApplicationIdPBImpl(p);
   }
@@ -195,6 +215,9 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
     if (this.version != null) {
       builder.setVersion(this.version);
     }
+    if (this.collectorToken != null) {
+      builder.setAppCollectorToken(
+          ((TokenPBImpl)this.collectorToken).getProto());
+    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 7dbae4e..02bb76f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -22,6 +22,7 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 package hadoop.yarn;
 
+import "Security.proto";
 import "yarn_protos.proto";
 import "yarn_server_common_protos.proto";
 import "yarn_service_protos.proto";
@@ -136,6 +137,7 @@ message AppCollectorDataProto {
   optional string app_collector_addr = 2;
   optional int64 rm_identifier = 3 [default = -1];
   optional int64 version = 4 [default = -1];
+  optional hadoop.common.TokenProto app_collector_token = 5;
 }
 
 //////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index 7eb8944..45d2cb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
@@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
@@ -91,6 +93,21 @@ public class TestRPC {
       "collectors' number in ReportNewCollectorInfoRequest is not ONE.";
 
   public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0";
+  private static final Token DEFAULT_COLLECTOR_TOKEN;
+  static {
+    TimelineDelegationTokenIdentifier identifier =
+        new TimelineDelegationTokenIdentifier();
+    identifier.setOwner(new Text("user"));
+    identifier.setRenewer(new Text("user"));
+    identifier.setRealUser(new Text("user"));
+    long now = Time.now();
+    identifier.setIssueDate(now);
+    identifier.setMaxDate(now + 1000L);
+    identifier.setMasterKeyId(500);
+    identifier.setSequenceNumber(5);
+    DEFAULT_COLLECTOR_TOKEN = Token.newInstance(identifier.getBytes(),
+        identifier.getKind().toString(), identifier.getBytes(), "localhost:0");
+  }
 
   public static final ApplicationId DEFAULT_APP_ID =
       ApplicationId.newInstance(0, 0);
@@ -171,7 +188,16 @@ public class TestRPC {
     try {
       ReportNewCollectorInfoRequest request =
           ReportNewCollectorInfoRequest.newInstance(
-              DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR);
+              DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, null);
+      proxy.reportNewCollectorInfo(request);
+    } catch (YarnException e) {
+      Assert.fail("RPC call failured is not expected here.");
+    }
+
+    try {
+      ReportNewCollectorInfoRequest request =
+          ReportNewCollectorInfoRequest.newInstance(
+              DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, DEFAULT_COLLECTOR_TOKEN);
       proxy.reportNewCollectorInfo(request);
     } catch (YarnException e) {
       Assert.fail("RPC call failured is not expected here.");
@@ -428,6 +454,8 @@ public class TestRPC {
             DEFAULT_APP_ID);
         Assert.assertEquals(appCollector.getCollectorAddr(),
             DEFAULT_COLLECTOR_ADDR);
+        Assert.assertTrue(appCollector.getCollectorToken() == null ||
+            appCollector.getCollectorToken().equals(DEFAULT_COLLECTOR_TOKEN));
       } else {
         throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index ffd1b7e..008f1ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
@@ -351,7 +352,8 @@ public class TestYarnServerApiClasses {
   private Map<ApplicationId, AppCollectorData> getCollectors() {
     ApplicationId appID = ApplicationId.newInstance(1L, 1);
     String collectorAddr = "localhost:0";
-    AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr);
+    AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr,
+        Token.newInstance(new byte[0], "kind", new byte[0], "s"));
     Map<ApplicationId, AppCollectorData> collectorMap =
         new HashMap<>();
     collectorMap.put(appID, data);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 7a9f82f..61f1d75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
-
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index aafb8d7..32fda01 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -44,8 +44,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
index 499a5cb..dffd19e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -305,7 +305,7 @@ public class MockResourceManagerFacade implements
         new ArrayList<ContainerStatus>(), containerList,
         new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
         new ArrayList<NMToken>(), newAMRMToken,
-        new ArrayList<UpdatedContainer>());
+        new ArrayList<UpdatedContainer>(), null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 5fe90fc..57ef493 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -48,8 +48,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -640,9 +638,9 @@ public class ApplicationMasterService extends AbstractService implements
 
     // add collector address for this application
     if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-      AppCollectorData data = app.getCollectorData();
-      if (data != null) {
-        allocateResponse.setCollectorAddr(data.getCollectorAddr());
+      CollectorInfo collectorInfo = app.getCollectorInfo();
+      if (collectorInfo != null) {
+        allocateResponse.setCollectorInfo(collectorInfo);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 1a0b920..93c41b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -187,14 +188,24 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * only if the timeline service v.2 is enabled.
    *
    * @return the data for the application's collector, including collector
-   * address, collector ID. Return null if the timeline service v.2 is not
-   * enabled.
+   * address, RM ID, version and collector token. Return null if the timeline
+   * service v.2 is not enabled.
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   AppCollectorData getCollectorData();
 
   /**
+   * The timeline collector information to be sent to AM. It should be used
+   * only if the timeline service v.2 is enabled.
+   *
+   * @return collector info, including collector address and collector token.
+   * Return null if the timeline service v.2 is not enabled.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  CollectorInfo getCollectorInfo();
+  /**
    * The original tracking url for the application master.
    * @return the original tracking url for the application master.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index fe0237e..623f564 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -167,6 +168,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private int firstAttemptIdInStateStore = 1;
   private int nextAttemptId = 1;
   private AppCollectorData collectorData;
+  private CollectorInfo collectorInfo;
   // This field isn't protected by readlock now.
   private volatile RMAppAttempt currentAttempt;
   private String queue;
@@ -528,7 +530,7 @@ public class RMAppImpl implements RMApp, Recoverable {
    */
   public void startTimelineCollector() {
     AppLevelTimelineCollector collector =
-        new AppLevelTimelineCollector(applicationId);
+        new AppLevelTimelineCollector(applicationId, user);
     rmContext.getRMTimelineCollectorManager().putIfAbsent(
         applicationId, collector);
   }
@@ -616,6 +618,12 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   public void setCollectorData(AppCollectorData incomingData) {
     this.collectorData = incomingData;
+    this.collectorInfo = CollectorInfo.newInstance(
+        incomingData.getCollectorAddr(), incomingData.getCollectorToken());
+  }
+
+  public CollectorInfo getCollectorInfo() {
+    return this.collectorInfo;
   }
 
   public void removeCollectorData() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org