You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/29 20:42:31 UTC
[46/50] [abbrv] hadoop git commit: MAPREDUCE-6838. [ATSv2 Security]
Add timeline delegation token received in allocate response to UGI.
Contributed by Varun Saxena
MAPREDUCE-6838. [ATSv2 Security] Add timeline delegation token received in allocate response to UGI. Contributed by Varun Saxena
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3fe0a6fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3fe0a6fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3fe0a6fd
Branch: refs/heads/YARN-5355
Commit: 3fe0a6fd6eb9df55be71817b8e8ba3d21130a71c
Parents: e9f2c26
Author: Jian He <ji...@apache.org>
Authored: Mon Aug 21 22:08:07 2017 -0700
Committer: Varun Saxena <va...@apache.org>
Committed: Wed Aug 30 01:18:45 2017 +0530
----------------------------------------------------------------------
.../v2/app/rm/RMContainerAllocator.java | 17 +--
.../v2/app/rm/TestRMContainerAllocator.java | 137 +++++++++++++++++++
.../hadoop/yarn/api/records/CollectorInfo.java | 4 +
.../api/async/impl/AMRMClientAsyncImpl.java | 13 +-
.../yarn/client/api/TimelineV2Client.java | 11 +-
.../client/api/impl/TimelineV2ClientImpl.java | 80 ++++++++++-
.../api/impl/TestTimelineClientV2Impl.java | 56 +++++++-
.../timelineservice/NMTimelinePublisher.java | 3 +-
.../TestTimelineServiceClientIntegration.java | 13 +-
.../security/TestTimelineAuthFilterForV2.java | 3 +-
10 files changed, 301 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 969ec4c..0dc7642 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -848,7 +848,8 @@ public class RMContainerAllocator extends RMContainerRequestor
updateAMRMToken(response.getAMRMToken());
}
- List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
+ List<ContainerStatus> finishedContainers =
+ response.getCompletedContainersStatuses();
// propagate preemption requests
final PreemptionMessage preemptReq = response.getPreemptionMessage();
@@ -877,19 +878,13 @@ public class RMContainerAllocator extends RMContainerRequestor
handleUpdatedNodes(response);
handleJobPriorityChange(response);
- // handle receiving the timeline collector address for this app
- String collectorAddr = null;
- if (response.getCollectorInfo() != null) {
- collectorAddr = response.getCollectorInfo().getCollectorAddr();
- }
-
+ // Handle receiving the timeline collector address and token for this app.
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
- if (collectorAddr != null && !collectorAddr.isEmpty()
- && appContext.getTimelineV2Client() != null) {
- appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr);
+ if (appContext.getTimelineV2Client() != null) {
+ appContext.getTimelineV2Client().
+ setTimelineCollectorInfo(response.getCollectorInfo());
}
-
for (ContainerStatus cont : finishedContainers) {
processFinishedContainer(cont);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 6c51626..6c74a7a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
@@ -27,6 +28,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -99,6 +101,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -110,6 +113,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -121,6 +125,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -137,9 +142,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.timelineservice.security.TimelineV2DelegationTokenSecretManagerService.TimelineV2DelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
@@ -749,6 +756,96 @@ public class TestRMContainerAllocator {
}
@Test
+ public void testUpdateCollectorInfo() throws Exception {
+ LOG.info("Running testUpdateCollectorInfo");
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ String localAddr = "localhost:1234";
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ // Generate a timeline delegation token.
+ TimelineDelegationTokenIdentifier ident =
+ new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()),
+ new Text("renewer"), null);
+ ident.setSequenceNumber(1);
+ Token<TimelineDelegationTokenIdentifier> collectorToken =
+ new Token<TimelineDelegationTokenIdentifier> (ident.getBytes(),
+ new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
+ new Text(localAddr));
+ org.apache.hadoop.yarn.api.records.Token token =
+ org.apache.hadoop.yarn.api.records.Token.newInstance(
+ collectorToken.getIdentifier(), collectorToken.getKind().toString(),
+ collectorToken.getPassword(),
+ collectorToken.getService().toString());
+ CollectorInfo collectorInfo = CollectorInfo.newInstance(localAddr, token);
+ // Mock scheduler to server Allocate request.
+ final MockSchedulerForTimelineCollector mockScheduler =
+ new MockSchedulerForTimelineCollector(collectorInfo);
+ MyContainerAllocator allocator =
+ new MyContainerAllocator(null, conf, attemptId, mockJob,
+ SystemClock.getInstance()) {
+ @Override
+ protected void register() {
+ }
+
+ @Override
+ protected ApplicationMasterProtocol createSchedulerProxy() {
+ return mockScheduler;
+ }
+ };
+ // Initially UGI should have no tokens.
+ ArrayList<Token<? extends TokenIdentifier>> tokens =
+ new ArrayList<>(ugi.getTokens());
+ assertEquals(0, tokens.size());
+ TimelineV2Client client = spy(TimelineV2Client.createTimelineClient(appId));
+ client.init(conf);
+ when(((RunningAppContext)allocator.getContext()).getTimelineV2Client()).
+ thenReturn(client);
+
+ // Send allocate request to RM and fetch collector address and token.
+ allocator.schedule();
+ verify(client).setTimelineCollectorInfo(collectorInfo);
+ // Verify if token has been updated in UGI.
+ tokens = new ArrayList<>(ugi.getTokens());
+ assertEquals(1, tokens.size());
+ assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
+ tokens.get(0).getKind());
+ assertEquals(collectorToken.decodeIdentifier(),
+ tokens.get(0).decodeIdentifier());
+
+ // Generate new collector token, send allocate request to RM and fetch the
+ // new token.
+ ident.setSequenceNumber(100);
+ Token<TimelineDelegationTokenIdentifier> collectorToken1 =
+ new Token<TimelineDelegationTokenIdentifier> (ident.getBytes(),
+ new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
+ new Text(localAddr));
+ token = org.apache.hadoop.yarn.api.records.Token.newInstance(
+ collectorToken1.getIdentifier(), collectorToken1.getKind().toString(),
+ collectorToken1.getPassword(), collectorToken1.getService().toString());
+ collectorInfo = CollectorInfo.newInstance(localAddr, token);
+ mockScheduler.updateCollectorInfo(collectorInfo);
+ allocator.schedule();
+ verify(client).setTimelineCollectorInfo(collectorInfo);
+ // Verify if new token has been updated in UGI.
+ tokens = new ArrayList<>(ugi.getTokens());
+ assertEquals(1, tokens.size());
+ assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
+ tokens.get(0).getKind());
+ assertEquals(collectorToken1.decodeIdentifier(),
+ tokens.get(0).decodeIdentifier());
+ allocator.close();
+ }
+
+ @Test
public void testMapReduceScheduling() throws Exception {
LOG.info("Running testMapReduceScheduling");
@@ -3488,6 +3585,46 @@ public class TestRMContainerAllocator {
}
}
+ private static class MockSchedulerForTimelineCollector
+ implements ApplicationMasterProtocol {
+ CollectorInfo collectorInfo;
+
+ public MockSchedulerForTimelineCollector(CollectorInfo info) {
+ this.collectorInfo = info;
+ }
+
+ void updateCollectorInfo(CollectorInfo info) {
+ collectorInfo = info;
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return Records.newRecord(RegisterApplicationMasterResponse.class);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return FinishApplicationMasterResponse.newInstance(false);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ AllocateResponse response = AllocateResponse.newInstance(
+ request.getResponseId(), Collections.<ContainerStatus>emptyList(),
+ Collections.<Container>emptyList(),
+ Collections.<NodeReport>emptyList(),
+ Resource.newInstance(512000, 1024), null, 10, null,
+ Collections.<NMToken>emptyList());
+ response.setCollectorInfo(collectorInfo);
+ return response;
+ }
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
index 960c992..d22b9fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
@@ -32,6 +32,10 @@ public abstract class CollectorInfo {
protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
+ public static CollectorInfo newInstance(String collectorAddr) {
+ return newInstance(collectorAddr, null);
+ }
+
public static CollectorInfo newInstance(String collectorAddr, Token token) {
CollectorInfo amCollectorInfo =
Records.newRecord(CollectorInfo.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 265badb..d12b108 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -68,8 +68,6 @@ extends AMRMClientAsync<T> {
private volatile boolean keepRunning;
private volatile float progress;
-
- private volatile String collectorAddr;
/**
*
@@ -332,14 +330,9 @@ extends AMRMClientAsync<T> {
TimelineV2Client timelineClient =
client.getRegisteredTimelineV2Client();
- if (timelineClient != null && collectorAddress != null
- && !collectorAddress.isEmpty()) {
- if (collectorAddr == null
- || !collectorAddr.equals(collectorAddress)) {
- collectorAddr = collectorAddress;
- timelineClient.setTimelineServiceAddress(collectorAddress);
- LOG.info("collectorAddress " + collectorAddress);
- }
+ if (timelineClient != null && response.getCollectorInfo() != null) {
+ timelineClient.
+ setTimelineCollectorInfo(response.getCollectorInfo());
}
List<NodeReport> updatedNodes = response.getUpdatedNodes();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
index 32cf1e9..da81a91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -83,10 +85,13 @@ public abstract class TimelineV2Client extends CompositeService {
/**
* <p>
- * Update the timeline service address where the request will be sent to.
+ * Update collector info received in AllocateResponse which contains the
+ * timeline service address where the request will be sent to and the timeline
+ * delegation token which will be used to send the request.
* </p>
*
- * @param address the timeline service address
+ * @param collectorInfo Collector info which contains the timeline service
+ * address and timeline delegation token.
*/
- public abstract void setTimelineServiceAddress(String address);
+ public abstract void setTimelineCollectorInfo(CollectorInfo collectorInfo);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
index 128ae7a..97d1364 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
@@ -39,15 +40,22 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.core.util.MultivaluedMapImpl;
@@ -62,6 +70,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
private TimelineEntityDispatcher entityDispatcher;
private volatile String timelineServiceAddress;
+ @VisibleForTesting
+ volatile Token currentTimelineToken = null;
// Retry parameters for identifying new timeline service
// TODO consider to merge with connection retry
@@ -100,7 +110,6 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
authUgi = ugi;
doAsUser = null;
}
-
// TODO need to add/cleanup filter retry later for ATSV2. similar to V1
DelegationTokenAuthenticatedURL.Token token =
new DelegationTokenAuthenticatedURL.Token();
@@ -144,8 +153,73 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
}
@Override
- public void setTimelineServiceAddress(String address) {
- this.timelineServiceAddress = address;
+ public void setTimelineCollectorInfo(CollectorInfo collectorInfo) {
+ if (collectorInfo == null) {
+ LOG.warn("Not setting collector info as it is null.");
+ return;
+ }
+ // First update the token so that it is available when collector address is
+ // used.
+ if (collectorInfo.getCollectorToken() != null) {
+ // Use collector address to update token service if its not available.
+ setTimelineDelegationToken(
+ collectorInfo.getCollectorToken(), collectorInfo.getCollectorAddr());
+ }
+ // Update timeline service address.
+ if (collectorInfo.getCollectorAddr() != null &&
+ !collectorInfo.getCollectorAddr().isEmpty() &&
+ !collectorInfo.getCollectorAddr().equals(timelineServiceAddress)) {
+ this.timelineServiceAddress = collectorInfo.getCollectorAddr();
+ LOG.info("Updated timeline service address to " + timelineServiceAddress);
+ }
+ }
+
+ private void setTimelineDelegationToken(Token delegationToken,
+ String collectorAddr) {
+ // Checks below are to ensure that an invalid token is not updated in UGI.
+ // This is required because timeline token is set via a public API.
+ if (!delegationToken.getKind().equals(
+ TimelineDelegationTokenIdentifier.KIND_NAME.toString())) {
+ LOG.warn("Timeline token to be updated should be of kind " +
+ TimelineDelegationTokenIdentifier.KIND_NAME);
+ return;
+ }
+ if (collectorAddr == null || collectorAddr.isEmpty()) {
+ collectorAddr = timelineServiceAddress;
+ }
+ // Token need not be updated if either address or token service does not
+ // exist.
+ String service = delegationToken.getService();
+ if ((service == null || service.isEmpty()) &&
+ (collectorAddr == null || collectorAddr.isEmpty())) {
+ LOG.warn("Timeline token does not have service and timeline service " +
+ "address is not yet set. Not updating the token");
+ return;
+ }
+ // No need to update a duplicate token.
+ if (currentTimelineToken != null &&
+ currentTimelineToken.equals(delegationToken)) {
+ return;
+ }
+ currentTimelineToken = delegationToken;
+ // Convert the token, sanitize the token service and add it to UGI.
+ org.apache.hadoop.security.token.
+ Token<TimelineDelegationTokenIdentifier> timelineToken =
+ new org.apache.hadoop.security.token.
+ Token<TimelineDelegationTokenIdentifier>(
+ delegationToken.getIdentifier().array(),
+ delegationToken.getPassword().array(),
+ new Text(delegationToken.getKind()),
+ service == null ? new Text() : new Text(service));
+ // Prefer timeline service address over service coming in the token for
+ // updating the token service.
+ InetSocketAddress serviceAddr =
+ (collectorAddr != null && !collectorAddr.isEmpty()) ?
+ NetUtils.createSocketAddr(collectorAddr) :
+ SecurityUtil.getTokenServiceAddr(timelineToken);
+ SecurityUtil.setTokenService(timelineToken, serviceAddr);
+ authUgi.addToken(timelineToken);
+ LOG.info("Updated timeline delegation token " + timelineToken);
}
@Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
index c5b02fd..95595a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.client.api.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -27,11 +32,16 @@ import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -151,7 +161,7 @@ public class TestTimelineClientV2Impl {
maxRetries);
c.init(conf);
c.start();
- c.setTimelineServiceAddress("localhost:12345");
+ c.setTimelineCollectorInfo(CollectorInfo.newInstance("localhost:12345"));
try {
c.putEntities(new TimelineEntity());
} catch (IOException e) {
@@ -311,6 +321,50 @@ public class TestTimelineClientV2Impl {
}
@Test
+ public void testSetTimelineToken() throws Exception {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ assertEquals(0, ugi.getTokens().size());
+ assertNull("Timeline token in v2 client should not be set",
+ client.currentTimelineToken);
+
+ Token token = Token.newInstance(
+ new byte[0], "kind", new byte[0], "service");
+ client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
+ assertNull("Timeline token in v2 client should not be set as token kind " +
+ "is unexepcted.", client.currentTimelineToken);
+ assertEquals(0, ugi.getTokens().size());
+
+ token = Token.newInstance(new byte[0], TimelineDelegationTokenIdentifier.
+ KIND_NAME.toString(), new byte[0], null);
+ client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
+ assertNull("Timeline token in v2 client should not be set as serice is " +
+ "not set.", client.currentTimelineToken);
+ assertEquals(0, ugi.getTokens().size());
+
+ TimelineDelegationTokenIdentifier ident =
+ new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()),
+ new Text("renewer"), null);
+ ident.setSequenceNumber(1);
+ token = Token.newInstance(ident.getBytes(),
+ TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
+ "localhost:1234");
+ client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
+ assertEquals(1, ugi.getTokens().size());
+ assertNotNull("Timeline token should be set in v2 client.",
+ client.currentTimelineToken);
+ assertEquals(token, client.currentTimelineToken);
+
+ ident.setSequenceNumber(20);
+ Token newToken = Token.newInstance(ident.getBytes(),
+ TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
+ "localhost:1234");
+ client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, newToken));
+ assertEquals(1, ugi.getTokens().size());
+ assertNotEquals(token, client.currentTimelineToken);
+ assertEquals(newToken, client.currentTimelineToken);
+ }
+
+ @Test
public void testAfterStop() throws Exception {
client.setSleepBeforeReturn(true);
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 34eddf7..b8192ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -437,7 +438,7 @@ public class NMTimelinePublisher extends CompositeService {
String collectorAddr) {
TimelineV2Client client = appToClientMap.get(appId);
if (client != null) {
- client.setTimelineServiceAddress(collectorAddr);
+ client.setTimelineCollectorInfo(CollectorInfo.newInstance(collectorAddr));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index eb4381d..6a5ef55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
@@ -99,9 +100,9 @@ public class TestTimelineServiceClientIntegration {
TimelineV2Client client =
TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
try {
- // set the timeline service address manually
- client.setTimelineServiceAddress(
- collectorManager.getRestServerBindAddress());
+ // Set the timeline service address manually.
+ client.setTimelineCollectorInfo(CollectorInfo.newInstance(
+ collectorManager.getRestServerBindAddress()));
client.init(conf);
client.start();
TimelineEntity entity = new TimelineEntity();
@@ -126,9 +127,9 @@ public class TestTimelineServiceClientIntegration {
TimelineV2Client client =
TimelineV2Client.createTimelineClient(appId);
try {
- // set the timeline service address manually
- client.setTimelineServiceAddress(
- collectorManager.getRestServerBindAddress());
+ // Set the timeline service address manually.
+ client.setTimelineCollectorInfo(CollectorInfo.newInstance(
+ collectorManager.getRestServerBindAddress()));
client.init(conf);
client.start();
ClusterEntity cluster = new ClusterEntity();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fe0a6fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
index bc1594c..75f17fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -228,7 +229,7 @@ public class TestTimelineAuthFilterForV2 {
String restBindAddr = collectorManager.getRestServerBindAddress();
String addr =
"localhost" + restBindAddr.substring(restBindAddr.indexOf(":"));
- client.setTimelineServiceAddress(addr);
+ client.setTimelineCollectorInfo(CollectorInfo.newInstance(addr));
client.init(conf);
client.start();
return client;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org