You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/29 20:42:31 UTC

[46/50] [abbrv] hadoop git commit: MAPREDUCE-6838. [ATSv2 Security] Add timeline delegation token received in allocate response to UGI. Contributed by Varun Saxena

MAPREDUCE-6838. [ATSv2 Security] Add timeline delegation token received in allocate response to UGI. Contributed by Varun Saxena


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3fe0a6fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3fe0a6fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3fe0a6fd

Branch: refs/heads/YARN-5355
Commit: 3fe0a6fd6eb9df55be71817b8e8ba3d21130a71c
Parents: e9f2c26
Author: Jian He <ji...@apache.org>
Authored: Mon Aug 21 22:08:07 2017 -0700
Committer: Varun Saxena <va...@apache.org>
Committed: Wed Aug 30 01:18:45 2017 +0530

----------------------------------------------------------------------
 .../v2/app/rm/RMContainerAllocator.java         |  17 +--
 .../v2/app/rm/TestRMContainerAllocator.java     | 137 +++++++++++++++++++
 .../hadoop/yarn/api/records/CollectorInfo.java  |   4 +
 .../api/async/impl/AMRMClientAsyncImpl.java     |  13 +-
 .../yarn/client/api/TimelineV2Client.java       |  11 +-
 .../client/api/impl/TimelineV2ClientImpl.java   |  80 ++++++++++-
 .../api/impl/TestTimelineClientV2Impl.java      |  56 +++++++-
 .../timelineservice/NMTimelinePublisher.java    |   3 +-
 .../TestTimelineServiceClientIntegration.java   |  13 +-
 .../security/TestTimelineAuthFilterForV2.java   |   3 +-
 10 files changed, 301 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 969ec4c..0dc7642 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -848,7 +848,8 @@ public class RMContainerAllocator extends RMContainerRequestor
       updateAMRMToken(response.getAMRMToken());
     }
 
-    List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
+    List<ContainerStatus> finishedContainers =
+        response.getCompletedContainersStatuses();
 
     // propagate preemption requests
     final PreemptionMessage preemptReq = response.getPreemptionMessage();
@@ -877,19 +878,13 @@ public class RMContainerAllocator extends RMContainerRequestor
 
     handleUpdatedNodes(response);
     handleJobPriorityChange(response);
-    // handle receiving the timeline collector address for this app
-    String collectorAddr = null;
-    if (response.getCollectorInfo() != null) {
-      collectorAddr = response.getCollectorInfo().getCollectorAddr();
-    }
-
+    // Handle receiving the timeline collector address and token for this app.
     MRAppMaster.RunningAppContext appContext =
         (MRAppMaster.RunningAppContext)this.getContext();
-    if (collectorAddr != null && !collectorAddr.isEmpty()
-        && appContext.getTimelineV2Client() != null) {
-      appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr);
+    if (appContext.getTimelineV2Client() != null) {
+      appContext.getTimelineV2Client().
+          setTimelineCollectorInfo(response.getCollectorInfo());
     }
-
     for (ContainerStatus cont : finishedContainers) {
       processFinishedContainer(cont);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 6c51626..6c74a7a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
@@ -27,6 +28,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -99,6 +101,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -110,6 +113,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -121,6 +125,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -137,9 +142,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.timelineservice.security.TimelineV2DelegationTokenSecretManagerService.TimelineV2DelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.After;
 import org.junit.Assert;
@@ -749,6 +756,96 @@ public class TestRMContainerAllocator {
   }
 
   @Test
+  public void testUpdateCollectorInfo() throws Exception {
+    LOG.info("Running testUpdateCollectorInfo");
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    String localAddr = "localhost:1234";
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    // Generate a timeline delegation token.
+    TimelineDelegationTokenIdentifier ident =
+        new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()),
+        new Text("renewer"), null);
+    ident.setSequenceNumber(1);
+    Token<TimelineDelegationTokenIdentifier> collectorToken =
+        new Token<TimelineDelegationTokenIdentifier> (ident.getBytes(),
+        new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
+        new Text(localAddr));
+    org.apache.hadoop.yarn.api.records.Token token =
+        org.apache.hadoop.yarn.api.records.Token.newInstance(
+            collectorToken.getIdentifier(), collectorToken.getKind().toString(),
+            collectorToken.getPassword(),
+            collectorToken.getService().toString());
+    CollectorInfo collectorInfo = CollectorInfo.newInstance(localAddr, token);
+    // Mock scheduler to server Allocate request.
+    final MockSchedulerForTimelineCollector mockScheduler =
+        new MockSchedulerForTimelineCollector(collectorInfo);
+    MyContainerAllocator allocator =
+        new MyContainerAllocator(null, conf, attemptId, mockJob,
+            SystemClock.getInstance()) {
+          @Override
+          protected void register() {
+          }
+
+          @Override
+          protected ApplicationMasterProtocol createSchedulerProxy() {
+            return mockScheduler;
+          }
+        };
+    // Initially UGI should have no tokens.
+    ArrayList<Token<? extends TokenIdentifier>> tokens =
+        new ArrayList<>(ugi.getTokens());
+    assertEquals(0, tokens.size());
+    TimelineV2Client client = spy(TimelineV2Client.createTimelineClient(appId));
+    client.init(conf);
+    when(((RunningAppContext)allocator.getContext()).getTimelineV2Client()).
+        thenReturn(client);
+
+    // Send allocate request to RM and fetch collector address and token.
+    allocator.schedule();
+    verify(client).setTimelineCollectorInfo(collectorInfo);
+    // Verify if token has been updated in UGI.
+    tokens = new ArrayList<>(ugi.getTokens());
+    assertEquals(1, tokens.size());
+    assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
+        tokens.get(0).getKind());
+    assertEquals(collectorToken.decodeIdentifier(),
+        tokens.get(0).decodeIdentifier());
+
+    // Generate new collector token, send allocate request to RM and fetch the
+    // new token.
+    ident.setSequenceNumber(100);
+    Token<TimelineDelegationTokenIdentifier> collectorToken1 =
+        new Token<TimelineDelegationTokenIdentifier> (ident.getBytes(),
+        new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
+        new Text(localAddr));
+    token = org.apache.hadoop.yarn.api.records.Token.newInstance(
+        collectorToken1.getIdentifier(), collectorToken1.getKind().toString(),
+        collectorToken1.getPassword(), collectorToken1.getService().toString());
+    collectorInfo = CollectorInfo.newInstance(localAddr, token);
+    mockScheduler.updateCollectorInfo(collectorInfo);
+    allocator.schedule();
+    verify(client).setTimelineCollectorInfo(collectorInfo);
+    // Verify if new token has been updated in UGI.
+    tokens = new ArrayList<>(ugi.getTokens());
+    assertEquals(1, tokens.size());
+    assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
+        tokens.get(0).getKind());
+    assertEquals(collectorToken1.decodeIdentifier(),
+        tokens.get(0).decodeIdentifier());
+    allocator.close();
+  }
+
+  @Test
   public void testMapReduceScheduling() throws Exception {
 
     LOG.info("Running testMapReduceScheduling");
@@ -3488,6 +3585,46 @@ public class TestRMContainerAllocator {
     }
   }
 
+  private static class MockSchedulerForTimelineCollector
+      implements ApplicationMasterProtocol {
+    CollectorInfo collectorInfo;
+
+    public MockSchedulerForTimelineCollector(CollectorInfo info) {
+      this.collectorInfo = info;
+    }
+
+    void updateCollectorInfo(CollectorInfo info) {
+      collectorInfo = info;
+    }
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request) throws YarnException,
+        IOException {
+      return Records.newRecord(RegisterApplicationMasterResponse.class);
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request) throws YarnException,
+        IOException {
+      return FinishApplicationMasterResponse.newInstance(false);
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnException, IOException {
+      AllocateResponse response =  AllocateResponse.newInstance(
+          request.getResponseId(), Collections.<ContainerStatus>emptyList(),
+          Collections.<Container>emptyList(),
+          Collections.<NodeReport>emptyList(),
+          Resource.newInstance(512000, 1024), null, 10, null,
+          Collections.<NMToken>emptyList());
+      response.setCollectorInfo(collectorInfo);
+      return response;
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
index 960c992..d22b9fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
@@ -32,6 +32,10 @@ public abstract class CollectorInfo {
 
   protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
 
+  public static CollectorInfo newInstance(String collectorAddr) {
+    return newInstance(collectorAddr, null);
+  }
+
   public static CollectorInfo newInstance(String collectorAddr, Token token) {
     CollectorInfo amCollectorInfo =
         Records.newRecord(CollectorInfo.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 265badb..d12b108 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -68,8 +68,6 @@ extends AMRMClientAsync<T> {
   
   private volatile boolean keepRunning;
   private volatile float progress;
-  
-  private volatile String collectorAddr;
 
   /**
    *
@@ -332,14 +330,9 @@ extends AMRMClientAsync<T> {
 
           TimelineV2Client timelineClient =
               client.getRegisteredTimelineV2Client();
-          if (timelineClient != null && collectorAddress != null
-              && !collectorAddress.isEmpty()) {
-            if (collectorAddr == null
-                || !collectorAddr.equals(collectorAddress)) {
-              collectorAddr = collectorAddress;
-              timelineClient.setTimelineServiceAddress(collectorAddress);
-              LOG.info("collectorAddress " + collectorAddress);
-            }
+          if (timelineClient != null && response.getCollectorInfo() != null) {
+            timelineClient.
+                setTimelineCollectorInfo(response.getCollectorInfo());
           }
 
           List<NodeReport> updatedNodes = response.getUpdatedNodes();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
index 32cf1e9..da81a91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -83,10 +85,13 @@ public abstract class TimelineV2Client extends CompositeService {
 
   /**
    * <p>
-   * Update the timeline service address where the request will be sent to.
+   * Update collector info received in AllocateResponse which contains the
+   * timeline service address where the request will be sent to and the timeline
+   * delegation token which will be used to send the request.
    * </p>
    *
-   * @param address the timeline service address
+   * @param collectorInfo Collector info which contains the timeline service
+   * address and timeline delegation token.
    */
-  public abstract void setTimelineServiceAddress(String address);
+  public abstract void setTimelineCollectorInfo(CollectorInfo collectorInfo);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
index 128ae7a..97d1364 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client.api.impl;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.BlockingQueue;
@@ -39,15 +40,22 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.core.util.MultivaluedMapImpl;
 
@@ -62,6 +70,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
 
   private TimelineEntityDispatcher entityDispatcher;
   private volatile String timelineServiceAddress;
+  @VisibleForTesting
+  volatile Token currentTimelineToken = null;
 
   // Retry parameters for identifying new timeline service
   // TODO consider to merge with connection retry
@@ -100,7 +110,6 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
       authUgi = ugi;
       doAsUser = null;
     }
-
     // TODO need to add/cleanup filter retry later for ATSV2. similar to V1
     DelegationTokenAuthenticatedURL.Token token =
         new DelegationTokenAuthenticatedURL.Token();
@@ -144,8 +153,73 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
   }
 
   @Override
-  public void setTimelineServiceAddress(String address) {
-    this.timelineServiceAddress = address;
+  public void setTimelineCollectorInfo(CollectorInfo collectorInfo) {
+    if (collectorInfo == null) {
+      LOG.warn("Not setting collector info as it is null.");
+      return;
+    }
+    // First update the token so that it is available when collector address is
+    // used.
+    if (collectorInfo.getCollectorToken() != null) {
+      // Use collector address to update token service if its not available.
+      setTimelineDelegationToken(
+          collectorInfo.getCollectorToken(), collectorInfo.getCollectorAddr());
+    }
+    // Update timeline service address.
+    if (collectorInfo.getCollectorAddr() != null &&
+        !collectorInfo.getCollectorAddr().isEmpty() &&
+        !collectorInfo.getCollectorAddr().equals(timelineServiceAddress)) {
+      this.timelineServiceAddress = collectorInfo.getCollectorAddr();
+      LOG.info("Updated timeline service address to " + timelineServiceAddress);
+    }
+  }
+
+  private void setTimelineDelegationToken(Token delegationToken,
+      String collectorAddr) {
+    // Checks below are to ensure that an invalid token is not updated in UGI.
+    // This is required because timeline token is set via a public API.
+    if (!delegationToken.getKind().equals(
+        TimelineDelegationTokenIdentifier.KIND_NAME.toString())) {
+      LOG.warn("Timeline token to be updated should be of kind " +
+          TimelineDelegationTokenIdentifier.KIND_NAME);
+      return;
+    }
+    if (collectorAddr == null || collectorAddr.isEmpty()) {
+      collectorAddr = timelineServiceAddress;
+    }
+    // Token need not be updated if either address or token service does not
+    // exist.
+    String service = delegationToken.getService();
+    if ((service == null || service.isEmpty()) &&
+        (collectorAddr == null || collectorAddr.isEmpty())) {
+      LOG.warn("Timeline token does not have service and timeline service " +
+          "address is not yet set. Not updating the token");
+      return;
+    }
+    // No need to update a duplicate token.
+    if (currentTimelineToken != null &&
+        currentTimelineToken.equals(delegationToken)) {
+      return;
+    }
+    currentTimelineToken = delegationToken;
+    // Convert the token, sanitize the token service and add it to UGI.
+    org.apache.hadoop.security.token.
+        Token<TimelineDelegationTokenIdentifier> timelineToken =
+            new org.apache.hadoop.security.token.
+            Token<TimelineDelegationTokenIdentifier>(
+                delegationToken.getIdentifier().array(),
+                delegationToken.getPassword().array(),
+                new Text(delegationToken.getKind()),
+                service == null ? new Text() : new Text(service));
+    // Prefer timeline service address over service coming in the token for
+    // updating the token service.
+    InetSocketAddress serviceAddr =
+        (collectorAddr != null && !collectorAddr.isEmpty()) ?
+        NetUtils.createSocketAddr(collectorAddr) :
+        SecurityUtil.getTokenServiceAddr(timelineToken);
+    SecurityUtil.setTokenService(timelineToken, serviceAddr);
+    authUgi.addToken(timelineToken);
+    LOG.info("Updated timeline delegation token " + timelineToken);
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
index c5b02fd..95595a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.client.api.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -27,11 +32,16 @@ import javax.ws.rs.core.MultivaluedMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -151,7 +161,7 @@ public class TestTimelineClientV2Impl {
         maxRetries);
     c.init(conf);
     c.start();
-    c.setTimelineServiceAddress("localhost:12345");
+    c.setTimelineCollectorInfo(CollectorInfo.newInstance("localhost:12345"));
     try {
       c.putEntities(new TimelineEntity());
     } catch (IOException e) {
@@ -311,6 +321,50 @@ public class TestTimelineClientV2Impl {
   }
 
   @Test
+  public void testSetTimelineToken() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    assertEquals(0, ugi.getTokens().size());
+    assertNull("Timeline token in v2 client should not be set",
+        client.currentTimelineToken);
+
+    Token token = Token.newInstance(
+        new byte[0], "kind", new byte[0], "service");
+    client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
+    assertNull("Timeline token in v2 client should not be set as token kind " +
+        "is unexepcted.", client.currentTimelineToken);
+    assertEquals(0, ugi.getTokens().size());
+
+    token = Token.newInstance(new byte[0], TimelineDelegationTokenIdentifier.
+        KIND_NAME.toString(), new byte[0], null);
+    client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
+    assertNull("Timeline token in v2 client should not be set as serice is " +
+        "not set.", client.currentTimelineToken);
+    assertEquals(0, ugi.getTokens().size());
+
+    TimelineDelegationTokenIdentifier ident =
+        new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()),
+        new Text("renewer"), null);
+    ident.setSequenceNumber(1);
+    token = Token.newInstance(ident.getBytes(),
+        TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
+        "localhost:1234");
+    client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
+    assertEquals(1, ugi.getTokens().size());
+    assertNotNull("Timeline token should be set in v2 client.",
+        client.currentTimelineToken);
+    assertEquals(token, client.currentTimelineToken);
+
+    ident.setSequenceNumber(20);
+    Token newToken = Token.newInstance(ident.getBytes(),
+        TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
+        "localhost:1234");
+    client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, newToken));
+    assertEquals(1, ugi.getTokens().size());
+    assertNotEquals(token, client.currentTimelineToken);
+    assertEquals(newToken, client.currentTimelineToken);
+  }
+
+  @Test
   public void testAfterStop() throws Exception {
     client.setSleepBeforeReturn(true);
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 34eddf7..b8192ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -437,7 +438,7 @@ public class NMTimelinePublisher extends CompositeService {
       String collectorAddr) {
     TimelineV2Client client = appToClientMap.get(appId);
     if (client != null) {
-      client.setTimelineServiceAddress(collectorAddr);
+      client.setTimelineCollectorInfo(CollectorInfo.newInstance(collectorAddr));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index eb4381d..6a5ef55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
@@ -99,9 +100,9 @@ public class TestTimelineServiceClientIntegration {
     TimelineV2Client client =
         TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
     try {
-      // set the timeline service address manually
-      client.setTimelineServiceAddress(
-          collectorManager.getRestServerBindAddress());
+      // Set the timeline service address manually.
+      client.setTimelineCollectorInfo(CollectorInfo.newInstance(
+          collectorManager.getRestServerBindAddress()));
       client.init(conf);
       client.start();
       TimelineEntity entity = new TimelineEntity();
@@ -126,9 +127,9 @@ public class TestTimelineServiceClientIntegration {
     TimelineV2Client client =
         TimelineV2Client.createTimelineClient(appId);
     try {
-      // set the timeline service address manually
-      client.setTimelineServiceAddress(
-          collectorManager.getRestServerBindAddress());
+      // Set the timeline service address manually.
+      client.setTimelineCollectorInfo(CollectorInfo.newInstance(
+          collectorManager.getRestServerBindAddress()));
       client.init(conf);
       client.start();
       ClusterEntity cluster = new ClusterEntity();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
index bc1594c..75f17fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -228,7 +229,7 @@ public class TestTimelineAuthFilterForV2 {
     String restBindAddr = collectorManager.getRestServerBindAddress();
     String addr =
         "localhost" + restBindAddr.substring(restBindAddr.indexOf(":"));
-    client.setTimelineServiceAddress(addr);
+    client.setTimelineCollectorInfo(CollectorInfo.newInstance(addr));
     client.init(conf);
     client.start();
     return client;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org