You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2017/08/11 07:09:48 UTC
[2/2] hadoop git commit: YARN-6130. [ATSv2 Security] Generate a
delegation token for AM when app collector is created and pass it to AM via
NM and RM. Contributed by Varun Saxena.
YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79806939
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79806939
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79806939
Branch: refs/heads/YARN-5355_branch2
Commit: 798069390739ab6971ca038aad4cd0adc4b9855a
Parents: e2ffa0f
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Fri Aug 11 12:35:35 2017 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Fri Aug 11 12:35:35 2017 +0530
----------------------------------------------------------------------
.../v2/app/rm/RMContainerAllocator.java | 9 +-
.../app/local/TestLocalContainerAllocator.java | 2 +-
.../api/protocolrecords/AllocateResponse.java | 92 +++++++++---
.../hadoop/yarn/api/records/CollectorInfo.java | 55 +++++++
.../src/main/proto/yarn_protos.proto | 5 +
.../src/main/proto/yarn_service_protos.proto | 2 +-
.../api/async/impl/AMRMClientAsyncImpl.java | 6 +-
.../ApplicationMasterServiceProtoTestBase.java | 72 +++++++++
.../hadoop/yarn/client/ProtocolHATestBase.java | 22 ++-
...ationMasterServiceProtocolForTimelineV2.java | 71 +++++++++
...estApplicationMasterServiceProtocolOnHA.java | 46 +-----
.../api/async/impl/TestAMRMClientAsync.java | 4 +-
.../impl/pb/AllocateResponsePBImpl.java | 37 ++++-
.../records/impl/pb/CollectorInfoPBImpl.java | 148 +++++++++++++++++++
.../hadoop/yarn/api/TestPBImplRecords.java | 2 +
.../ReportNewCollectorInfoRequest.java | 5 +-
.../impl/pb/NodeHeartbeatRequestPBImpl.java | 25 +++-
.../impl/pb/NodeHeartbeatResponsePBImpl.java | 21 ++-
.../pb/ReportNewCollectorInfoRequestPBImpl.java | 4 +-
.../server/api/records/AppCollectorData.java | 27 +++-
.../records/impl/pb/AppCollectorDataPBImpl.java | 29 +++-
.../yarn_server_common_service_protos.proto | 2 +
.../java/org/apache/hadoop/yarn/TestRPC.java | 30 +++-
.../hadoop/yarn/TestYarnServerApiClasses.java | 4 +-
.../nodemanager/NodeStatusUpdaterImpl.java | 1 -
.../application/ApplicationImpl.java | 2 +-
.../amrmproxy/MockResourceManagerFacade.java | 2 +-
.../ApplicationMasterService.java | 10 +-
.../server/resourcemanager/rmapp/RMApp.java | 15 +-
.../server/resourcemanager/rmapp/RMAppImpl.java | 10 +-
.../applicationsmanager/MockAsm.java | 6 +
.../server/resourcemanager/rmapp/MockRMApp.java | 7 +-
.../TestTimelineServiceClientIntegration.java | 2 +-
.../security/TestTimelineAuthFilterForV2.java | 121 +++++++++++----
.../collector/AppLevelTimelineCollector.java | 24 +++
.../AppLevelTimelineCollectorWithAgg.java | 4 +-
.../collector/NodeTimelineCollectorManager.java | 83 +++++++++--
.../PerNodeTimelineCollectorsAuxService.java | 7 +-
...neV2DelegationTokenSecretManagerService.java | 31 ++++
.../TestNMTimelineCollectorManager.java | 4 +-
40 files changed, 887 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 218e218..d681940 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -860,13 +860,16 @@ public class RMContainerAllocator extends RMContainerRequestor
handleUpdatedNodes(response);
handleJobPriorityChange(response);
// handle receiving the timeline collector address for this app
- String collectorAddr = response.getCollectorAddr();
+ String collectorAddr = null;
+ if (response.getCollectorInfo() != null) {
+ collectorAddr = response.getCollectorInfo().getCollectorAddr();
+ }
+
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
if (collectorAddr != null && !collectorAddr.isEmpty()
&& appContext.getTimelineV2Client() != null) {
- appContext.getTimelineV2Client().setTimelineServiceAddress(
- response.getCollectorAddr());
+ appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr);
}
for (ContainerStatus cont : finishedContainers) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
index 3fa0043..9549b68 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
@@ -297,7 +297,7 @@ public class TestLocalContainerAllocator {
Resources.none(), null, 1, null,
Collections.<NMToken>emptyList(),
yarnToken,
- Collections.<UpdatedContainer>emptyList());
+ Collections.<UpdatedContainer>emptyList(), null);
response.setApplicationPriority(Priority.newInstance(0));
return response;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index c369c3c..b2b40a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
@@ -96,7 +97,8 @@ public abstract class AllocateResponse {
/**
* Use {@link AllocateResponse#newInstance(int, List, List, List, Resource,
- * AMCommand, int, PreemptionMessage, List, Token, List)} instead
+ * AMCommand, int, PreemptionMessage, List, Token, List, CollectorInfo)}
+ * instead.
* @param responseId responseId
* @param completedContainers completedContainers
* @param allocatedContainers allocatedContainers
@@ -117,10 +119,14 @@ public abstract class AllocateResponse {
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens,
List<ContainerResourceIncrease> increasedContainers,
- List<ContainerResourceDecrease> decreasedContainers) {
- return newInstance(responseId, completedContainers, allocatedContainers,
- updatedNodes, availResources, command, numClusterNodes, preempt,
- nmTokens);
+ List<ContainerResourceDecrease> decreasedContainers,
+ CollectorInfo collectorInfo) {
+ return AllocateResponse.newBuilder().responseId(responseId)
+ .completedContainersStatuses(completedContainers)
+ .allocatedContainers(allocatedContainers)
+ .updatedNodes(updatedNodes).availableResources(availResources)
+ .amCommand(command).nmTokens(nmTokens).collectorInfo(collectorInfo)
+ .build();
}
@Public
@@ -147,14 +153,15 @@ public abstract class AllocateResponse {
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
- List<UpdatedContainer> updatedContainers) {
+ List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo) {
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
.responseId(responseId)
.completedContainersStatuses(completedContainers)
.allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
.availableResources(availResources).amCommand(command)
.preemptionMessage(preempt).nmTokens(nmTokens)
- .updatedContainers(updatedContainers).amRmToken(amRMToken).build();
+ .updatedContainers(updatedContainers).amRmToken(amRMToken)
+ .collectorInfo(collectorInfo).build();
}
/**
@@ -346,6 +353,20 @@ public abstract class AllocateResponse {
public abstract void setApplicationPriority(Priority priority);
/**
+ * The data associated with the collector that belongs to this app. Contains
+ * address and token alongwith identification information.
+ *
+ * @return The data of collector that belong to this attempt
+ */
+ @Public
+ @Unstable
+ public abstract CollectorInfo getCollectorInfo();
+
+ @Private
+ @Unstable
+ public abstract void setCollectorInfo(CollectorInfo info);
+
+ /**
* Get the list of container update errors to inform the
* Application Master about the container updates that could not be
* satisfied due to error.
@@ -544,6 +565,50 @@ public abstract class AllocateResponse {
}
/**
+ * Set the <code>applicationPriority</code> of the response.
+ * @see AllocateResponse#setApplicationPriority(Priority)
+ * @param applicationPriority
+ * <code>applicationPriority</code> of the response
+ * @return {@link AllocateResponseBuilder}
+ */
+ @Private
+ @Unstable
+ public AllocateResponseBuilder applicationPriority(
+ Priority applicationPriority) {
+ allocateResponse.setApplicationPriority(applicationPriority);
+ return this;
+ }
+
+ /**
+ * Set the <code>collectorInfo</code> of the response.
+ * @see AllocateResponse#setCollectorInfo(CollectorInfo)
+ * @param collectorInfo <code>collectorInfo</code> of the response which
+ * contains collector address, RM id, version and collector token.
+ * @return {@link AllocateResponseBuilder}
+ */
+ @Private
+ @Unstable
+ public AllocateResponseBuilder collectorInfo(
+ CollectorInfo collectorInfo) {
+ allocateResponse.setCollectorInfo(collectorInfo);
+ return this;
+ }
+
+ /**
+ * Set the <code>updateErrors</code> of the response.
+ * @see AllocateResponse#setUpdateErrors(List)
+ * @param updateErrors <code>updateErrors</code> of the response
+ * @return {@link AllocateResponseBuilder}
+ */
+ @Private
+ @Unstable
+ public AllocateResponseBuilder updateErrors(
+ List<UpdateContainerError> updateErrors) {
+ allocateResponse.setUpdateErrors(updateErrors);
+ return this;
+ }
+
+ /**
* Return generated {@link AllocateResponse} object.
* @return {@link AllocateResponse}
*/
@@ -568,17 +633,4 @@ public abstract class AllocateResponse {
@Deprecated
public abstract List<ContainerResourceDecrease> getDecreasedContainers();
- /**
- * The address of collector that belong to this app
- *
- * @return The address of collector that belong to this attempt
- */
- @Public
- @Unstable
- public abstract String getCollectorAddr();
-
- @Private
- @Unstable
- public abstract void setCollectorAddr(String collectorAddr);
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
new file mode 100644
index 0000000..960c992
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Collector info containing collector address and collector token passed from
+ * RM to AM in Allocate Response.
+ */
+@Private
+@InterfaceStability.Unstable
+public abstract class CollectorInfo {
+
+ protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
+
+ public static CollectorInfo newInstance(String collectorAddr, Token token) {
+ CollectorInfo amCollectorInfo =
+ Records.newRecord(CollectorInfo.class);
+ amCollectorInfo.setCollectorAddr(collectorAddr);
+ amCollectorInfo.setCollectorToken(token);
+ return amCollectorInfo;
+ }
+
+ public abstract String getCollectorAddr();
+
+ public abstract void setCollectorAddr(String addr);
+
+ /**
+ * Get delegation token for app collector which AM will use to publish
+ * entities.
+ * @return the delegation token for app collector.
+ */
+ public abstract Token getCollectorToken();
+
+ public abstract void setCollectorToken(Token token);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 4af5a97..59d9141 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -635,3 +635,8 @@ message ContainerResourceDecreaseProto {
optional ContainerIdProto container_id = 1;
optional ResourceProto capability = 2;
}
+
+message CollectorInfoProto {
+ optional string collector_addr = 1;
+ optional hadoop.common.TokenProto collector_token = 2;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 2a668bd..a212a7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -114,7 +114,7 @@ message AllocateResponseProto {
repeated ContainerResourceDecreaseProto decreased_containers = 11;
optional hadoop.common.TokenProto am_rm_token = 12;
optional PriorityProto application_priority = 13;
- optional string collector_addr = 14;
+ optional CollectorInfoProto collector_info = 14;
repeated UpdateContainerErrorProto update_errors = 15;
repeated UpdatedContainerProto updated_containers = 16;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 8b2557c..eb94b28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -327,7 +327,11 @@ extends AMRMClientAsync<T> {
AllocateResponse response = (AllocateResponse) object;
- String collectorAddress = response.getCollectorAddr();
+ String collectorAddress = null;
+ if (response.getCollectorInfo() != null) {
+ collectorAddress = response.getCollectorInfo().getCollectorAddr();
+ }
+
TimelineV2Client timelineClient =
client.getRegisteredTimelineV2Client();
if (timelineClient != null && collectorAddress != null
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java
new file mode 100644
index 0000000..4521018
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.junit.After;
+
+/**
+ * Test Base for Application Master Service Protocol.
+ */
+public abstract class ApplicationMasterServiceProtoTestBase
+ extends ProtocolHATestBase {
+
+ private ApplicationMasterProtocol amClient;
+ private ApplicationAttemptId attemptId;
+
+ protected void startupHAAndSetupClient() throws Exception {
+ attemptId = this.cluster.createFakeApplicationAttemptId();
+
+ Token<AMRMTokenIdentifier> appToken =
+ this.cluster.getResourceManager().getRMContext()
+ .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
+ appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
+ UserGroupInformation.setLoginUser(UserGroupInformation
+ .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+ UserGroupInformation.getCurrentUser().addToken(appToken);
+ syncToken(appToken);
+ amClient = ClientRMProxy
+ .createRMProxy(this.conf, ApplicationMasterProtocol.class);
+ }
+
+ @After
+ public void shutDown() {
+ if(this.amClient != null) {
+ RPC.stopProxy(this.amClient);
+ }
+ }
+
+ protected ApplicationMasterProtocol getAMClient() {
+ return amClient;
+ }
+
+ private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
+ for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
+ this.cluster.getResourceManager(i).getRMContext()
+ .getAMRMTokenSecretManager().addPersistedPassword(token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index a8e9132..efb1987 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -98,6 +99,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -804,11 +806,21 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
}
public AllocateResponse createFakeAllocateResponse() {
- return AllocateResponse.newInstance(-1,
- new ArrayList<ContainerStatus>(),
- new ArrayList<Container>(), new ArrayList<NodeReport>(),
- Resource.newInstance(1024, 2), null, 1,
- null, new ArrayList<NMToken>());
+ if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+ return AllocateResponse.newInstance(-1,
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>(),
+ new ArrayList<NodeReport>(), Resource.newInstance(1024, 2), null, 1,
+ null, new ArrayList<NMToken>(), null,
+ new ArrayList<UpdatedContainer>(),
+ CollectorInfo.newInstance("host:port", Token.newInstance(
+ new byte[] {0}, "TIMELINE", new byte[] {0}, "rm")));
+ } else {
+ return AllocateResponse.newInstance(-1,
+ new ArrayList<ContainerStatus>(),
+ new ArrayList<Container>(), new ArrayList<NodeReport>(),
+ Resource.newInstance(1024, 2), null, 1,
+ null, new ArrayList<NMToken>());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java
new file mode 100644
index 0000000..be8c302
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java
@@ -0,0 +1,71 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests Application Master Protocol with timeline service v2 enabled.
+ */
+public class TestApplicationMasterServiceProtocolForTimelineV2
+ extends ApplicationMasterServiceProtoTestBase {
+
+ @Before
+ public void initialize() throws Exception {
+ HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE + 200, conf);
+ HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE + 200, conf);
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+ conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+ FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+ startHACluster(0, false, false, true);
+ super.startupHAAndSetupClient();
+ }
+
+ @Test(timeout = 15000)
+ public void testAllocateForTimelineV2OnHA()
+ throws YarnException, IOException {
+ AllocateRequest request = AllocateRequest.newInstance(0, 50f,
+ new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>(),
+ ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
+ new ArrayList<String>()));
+ AllocateResponse response = getAMClient().allocate(request);
+ Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
+ Assert.assertNotNull(response.getCollectorInfo());
+ Assert.assertEquals("host:port",
+ response.getCollectorInfo().getCollectorAddr());
+ Assert.assertNotNull(response.getCollectorInfo().getCollectorToken());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
index ad86fb3..c2f39a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
@@ -23,10 +23,6 @@ import java.util.ArrayList;
import org.junit.Assert;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -34,45 +30,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestApplicationMasterServiceProtocolOnHA
- extends ProtocolHATestBase {
- private ApplicationMasterProtocol amClient;
- private ApplicationAttemptId attemptId ;
-
+ extends ApplicationMasterServiceProtoTestBase {
@Before
public void initialize() throws Exception {
startHACluster(0, false, false, true);
- attemptId = this.cluster.createFakeApplicationAttemptId();
-
- Token<AMRMTokenIdentifier> appToken =
- this.cluster.getResourceManager().getRMContext()
- .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
- appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
- UserGroupInformation.setLoginUser(UserGroupInformation
- .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
- UserGroupInformation.getCurrentUser().addToken(appToken);
- syncToken(appToken);
-
- amClient = ClientRMProxy
- .createRMProxy(this.conf, ApplicationMasterProtocol.class);
- }
-
- @After
- public void shutDown() {
- if(this.amClient != null) {
- RPC.stopProxy(this.amClient);
- }
+ super.startupHAAndSetupClient();
}
@Test(timeout = 15000)
@@ -81,7 +52,7 @@ public class TestApplicationMasterServiceProtocolOnHA
RegisterApplicationMasterRequest request =
RegisterApplicationMasterRequest.newInstance("localhost", 0, "");
RegisterApplicationMasterResponse response =
- amClient.registerApplicationMaster(request);
+ getAMClient().registerApplicationMaster(request);
Assert.assertEquals(response,
this.cluster.createFakeRegisterApplicationMasterResponse());
}
@@ -93,7 +64,7 @@ public class TestApplicationMasterServiceProtocolOnHA
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", "");
FinishApplicationMasterResponse response =
- amClient.finishApplicationMaster(request);
+ getAMClient().finishApplicationMaster(request);
Assert.assertEquals(response,
this.cluster.createFakeFinishApplicationMasterResponse());
}
@@ -105,14 +76,7 @@ public class TestApplicationMasterServiceProtocolOnHA
new ArrayList<ContainerId>(),
ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
new ArrayList<String>()));
- AllocateResponse response = amClient.allocate(request);
+ AllocateResponse response = getAMClient().allocate(request);
Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
}
-
- private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
- for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
- this.cluster.getResourceManager(i).getRMContext()
- .getAMRMTokenSecretManager().addPersistedPassword(token);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
index ba38340..9c64412 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
@@ -426,8 +426,8 @@ public class TestAMRMClientAsync {
}
AllocateResponse response =
AllocateResponse.newInstance(0, completed, allocated,
- new ArrayList<NodeReport>(), null, null, 1, null, nmTokens,
- updatedContainers);
+ new ArrayList<NodeReport>(), null, null, 1, null, nmTokens, null,
+ updatedContainers, null);
return response;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index bd82016..61d16e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
@@ -82,6 +85,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
private PreemptionMessage preempt;
private Token amrmToken = null;
private Priority appPriority = null;
+ private CollectorInfo collectorInfo = null;
public AllocateResponsePBImpl() {
builder = AllocateResponseProto.newBuilder();
@@ -164,6 +168,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
if (this.amrmToken != null) {
builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
}
+ if (this.collectorInfo != null) {
+ builder.setCollectorInfo(convertToProtoFormat(this.collectorInfo));
+ }
if (this.appPriority != null) {
builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
}
@@ -410,19 +417,25 @@ public class AllocateResponsePBImpl extends AllocateResponse {
@Override
- public synchronized String getCollectorAddr() {
+ public synchronized CollectorInfo getCollectorInfo() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
- return p.getCollectorAddr();
+ if (this.collectorInfo != null) {
+ return this.collectorInfo;
+ }
+ if (!p.hasCollectorInfo()) {
+ return null;
+ }
+ this.collectorInfo = convertFromProtoFormat(p.getCollectorInfo());
+ return this.collectorInfo;
}
@Override
- public synchronized void setCollectorAddr(String collectorAddr) {
+ public synchronized void setCollectorInfo(CollectorInfo info) {
maybeInitBuilder();
- if (collectorAddr == null) {
- builder.clearCollectorAddr();
- return;
+ if (info == null) {
+ builder.clearCollectorInfo();
}
- builder.setCollectorAddr(collectorAddr);
+ this.collectorInfo = info;
}
@Override
@@ -730,6 +743,16 @@ public class AllocateResponsePBImpl extends AllocateResponse {
return ((NodeReportPBImpl)t).getProto();
}
+ private synchronized CollectorInfoPBImpl convertFromProtoFormat(
+ CollectorInfoProto p) {
+ return new CollectorInfoPBImpl(p);
+ }
+
+ private synchronized CollectorInfoProto convertToProtoFormat(
+ CollectorInfo t) {
+ return ((CollectorInfoPBImpl)t).getProto();
+ }
+
private synchronized ContainerPBImpl convertFromProtoFormat(
ContainerProto p) {
return new ContainerPBImpl(p);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java
new file mode 100644
index 0000000..bb54133
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Protocol record implementation of {@link CollectorInfo}.
+ */
+public class CollectorInfoPBImpl extends CollectorInfo {
+
+ private CollectorInfoProto proto = CollectorInfoProto.getDefaultInstance();
+
+ private CollectorInfoProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ private String collectorAddr = null;
+ private Token collectorToken = null;
+
+
+ public CollectorInfoPBImpl() {
+ builder = CollectorInfoProto.newBuilder();
+ }
+
+ public CollectorInfoPBImpl(CollectorInfoProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public CollectorInfoProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = CollectorInfoProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public String getCollectorAddr() {
+ CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.collectorAddr == null && p.hasCollectorAddr()) {
+ this.collectorAddr = p.getCollectorAddr();
+ }
+ return this.collectorAddr;
+ }
+
+ @Override
+ public void setCollectorAddr(String addr) {
+ maybeInitBuilder();
+ if (collectorAddr == null) {
+ builder.clearCollectorAddr();
+ }
+ this.collectorAddr = addr;
+ }
+
+ @Override
+ public Token getCollectorToken() {
+ CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.collectorToken == null && p.hasCollectorToken()) {
+ this.collectorToken = convertFromProtoFormat(p.getCollectorToken());
+ }
+ return this.collectorToken;
+ }
+
+ @Override
+ public void setCollectorToken(Token token) {
+ maybeInitBuilder();
+ if (token == null) {
+ builder.clearCollectorToken();
+ }
+ this.collectorToken = token;
+ }
+
+ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+ return new TokenPBImpl(p);
+ }
+
+ private TokenProto convertToProtoFormat(Token t) {
+ return ((TokenPBImpl) t).getProto();
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.collectorAddr != null) {
+ builder.setCollectorAddr(this.collectorAddr);
+ }
+ if (this.collectorToken != null) {
+ builder.setCollectorToken(convertToProtoFormat(this.collectorToken));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index 932e078..bc657ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestP
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -405,6 +406,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
generateByNewInstance(ApplicationTimeout.class);
generateByNewInstance(ContainerResourceIncreaseRequest.class);
generateByNewInstance(QueueConfigurations.class);
+ generateByNewInstance(CollectorInfo.class);
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
index 1503eca..a4c1a38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.util.Records;
@@ -37,11 +38,11 @@ public abstract class ReportNewCollectorInfoRequest {
}
public static ReportNewCollectorInfoRequest newInstance(
- ApplicationId id, String collectorAddr) {
+ ApplicationId id, String collectorAddr, Token token) {
ReportNewCollectorInfoRequest request =
Records.newRecord(ReportNewCollectorInfoRequest.class);
request.setAppCollectorsList(
- Arrays.asList(AppCollectorData.newInstance(id, collectorAddr)));
+ Arrays.asList(AppCollectorData.newInstance(id, collectorAddr, token)));
return request;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 73a8abe..c07a6eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -26,23 +26,26 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -164,9 +167,13 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
builder.clearRegisteringCollectors();
for (Map.Entry<ApplicationId, AppCollectorData> entry :
registeringCollectors.entrySet()) {
+ AppCollectorData data = entry.getValue();
builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
- .setAppCollectorAddr(entry.getValue().getCollectorAddr()));
+ .setAppCollectorAddr(data.getCollectorAddr())
+ .setAppCollectorToken(convertToProtoFormat(data.getCollectorToken()))
+ .setRmIdentifier(data.getRMIdentifier())
+ .setVersion(data.getVersion()));
}
}
@@ -267,8 +274,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
this.registeringCollectors = new HashMap<>();
for (AppCollectorDataProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
+ Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
AppCollectorData data = AppCollectorData.newInstance(appId,
- c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+ c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
+ collectorToken);
this.registeringCollectors.put(appId, data);
}
}
@@ -309,6 +318,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
return ((MasterKeyPBImpl)t).getProto();
}
+ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+ return new TokenPBImpl(p);
+ }
+
+ private TokenProto convertToProtoFormat(Token t) {
+ return ((TokenPBImpl) t).getProto();
+ }
+
@Override
public Set<NodeLabel> getNodeLabels() {
initNodeLabels();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index bc4e802..3f4b4ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -26,32 +26,35 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -153,6 +156,8 @@ public class NodeHeartbeatResponsePBImpl extends
builder.addAppCollectors(AppCollectorDataProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
.setAppCollectorAddr(data.getCollectorAddr())
+ .setAppCollectorToken(
+ convertToProtoFormat(entry.getValue().getCollectorToken()))
.setRmIdentifier(data.getRMIdentifier())
.setVersion(data.getVersion()));
}
@@ -598,8 +603,10 @@ public class NodeHeartbeatResponsePBImpl extends
this.appCollectorsMap = new HashMap<>();
for (AppCollectorDataProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
+ Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
AppCollectorData data = AppCollectorData.newInstance(appId,
- c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+ c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
+ collectorToken);
this.appCollectorsMap.put(appId, data);
}
}
@@ -779,5 +786,13 @@ public class NodeHeartbeatResponsePBImpl extends
SignalContainerRequest t) {
return ((SignalContainerRequestPBImpl)t).getProto();
}
+
+ private TokenProto convertToProtoFormat(Token t) {
+ return ((TokenPBImpl) t).getProto();
+ }
+
+ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+ return new TokenPBImpl(p);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
index 3f3dcf5..5ffc3a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
public class ReportNewCollectorInfoRequestPBImpl extends
ReportNewCollectorInfoRequest {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
index da2e5de..5266dca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.util.Records;
@@ -31,20 +32,32 @@ public abstract class AppCollectorData {
protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
public static AppCollectorData newInstance(
- ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
+ ApplicationId id, String collectorAddr, long rmIdentifier, long version,
+ Token token) {
AppCollectorData appCollectorData =
Records.newRecord(AppCollectorData.class);
appCollectorData.setApplicationId(id);
appCollectorData.setCollectorAddr(collectorAddr);
appCollectorData.setRMIdentifier(rmIdentifier);
appCollectorData.setVersion(version);
+ appCollectorData.setCollectorToken(token);
return appCollectorData;
}
+ public static AppCollectorData newInstance(
+ ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
+ return newInstance(id, collectorAddr, rmIdentifier, version, null);
+ }
+
public static AppCollectorData newInstance(ApplicationId id,
- String collectorAddr) {
+ String collectorAddr, Token token) {
return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
- DEFAULT_TIMESTAMP_VALUE);
+ DEFAULT_TIMESTAMP_VALUE, token);
+ }
+
+ public static AppCollectorData newInstance(ApplicationId id,
+ String collectorAddr) {
+ return newInstance(id, collectorAddr, null);
}
/**
@@ -101,4 +114,12 @@ public abstract class AppCollectorData {
public abstract void setVersion(long version);
+ /**
+ * Get delegation token for app collector which AM will use to publish
+ * entities.
+ * @return the delegation token for app collector.
+ */
+ public abstract Token getCollectorToken();
+
+ public abstract void setCollectorToken(Token token);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
index 7d3a805..7144f51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
@@ -19,10 +19,11 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
-
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder;
@@ -43,6 +44,7 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
private String collectorAddr = null;
private Long rmIdentifier = null;
private Long version = null;
+ private Token collectorToken = null;
public AppCollectorDataPBImpl() {
builder = AppCollectorDataProto.newBuilder();
@@ -158,6 +160,24 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
builder.setVersion(version);
}
+ @Override
+ public Token getCollectorToken() {
+ AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.collectorToken == null && p.hasAppCollectorToken()) {
+ this.collectorToken = new TokenPBImpl(p.getAppCollectorToken());
+ }
+ return this.collectorToken;
+ }
+
+ @Override
+ public void setCollectorToken(Token token) {
+ maybeInitBuilder();
+ if (token == null) {
+ builder.clearAppCollectorToken();
+ }
+ this.collectorToken = token;
+ }
+
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
@@ -195,6 +215,9 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
if (this.version != null) {
builder.setVersion(this.version);
}
+ if (this.collectorToken != null) {
+ builder.setAppCollectorToken(
+ ((TokenPBImpl)this.collectorToken).getProto());
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 7dbae4e..02bb76f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -22,6 +22,7 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
+import "Security.proto";
import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
import "yarn_service_protos.proto";
@@ -136,6 +137,7 @@ message AppCollectorDataProto {
optional string app_collector_addr = 2;
optional int64 rm_identifier = 3 [default = -1];
optional int64 version = 4 [default = -1];
+ optional hadoop.common.TokenProto app_collector_token = 5;
}
//////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index 7eb8944..45d2cb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
@@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
@@ -91,6 +93,21 @@ public class TestRPC {
"collectors' number in ReportNewCollectorInfoRequest is not ONE.";
public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0";
+ private static final Token DEFAULT_COLLECTOR_TOKEN;
+ static {
+ TimelineDelegationTokenIdentifier identifier =
+ new TimelineDelegationTokenIdentifier();
+ identifier.setOwner(new Text("user"));
+ identifier.setRenewer(new Text("user"));
+ identifier.setRealUser(new Text("user"));
+ long now = Time.now();
+ identifier.setIssueDate(now);
+ identifier.setMaxDate(now + 1000L);
+ identifier.setMasterKeyId(500);
+ identifier.setSequenceNumber(5);
+ DEFAULT_COLLECTOR_TOKEN = Token.newInstance(identifier.getBytes(),
+ identifier.getKind().toString(), identifier.getBytes(), "localhost:0");
+ }
public static final ApplicationId DEFAULT_APP_ID =
ApplicationId.newInstance(0, 0);
@@ -171,7 +188,16 @@ public class TestRPC {
try {
ReportNewCollectorInfoRequest request =
ReportNewCollectorInfoRequest.newInstance(
- DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR);
+ DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, null);
+ proxy.reportNewCollectorInfo(request);
+ } catch (YarnException e) {
+ Assert.fail("RPC call failured is not expected here.");
+ }
+
+ try {
+ ReportNewCollectorInfoRequest request =
+ ReportNewCollectorInfoRequest.newInstance(
+ DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, DEFAULT_COLLECTOR_TOKEN);
proxy.reportNewCollectorInfo(request);
} catch (YarnException e) {
Assert.fail("RPC call failured is not expected here.");
@@ -428,6 +454,8 @@ public class TestRPC {
DEFAULT_APP_ID);
Assert.assertEquals(appCollector.getCollectorAddr(),
DEFAULT_COLLECTOR_ADDR);
+ Assert.assertTrue(appCollector.getCollectorToken() == null ||
+ appCollector.getCollectorToken().equals(DEFAULT_COLLECTOR_TOKEN));
} else {
throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index ffd1b7e..008f1ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
@@ -351,7 +352,8 @@ public class TestYarnServerApiClasses {
private Map<ApplicationId, AppCollectorData> getCollectors() {
ApplicationId appID = ApplicationId.newInstance(1L, 1);
String collectorAddr = "localhost:0";
- AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr);
+ AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr,
+ Token.newInstance(new byte[0], "kind", new byte[0], "s"));
Map<ApplicationId, AppCollectorData> collectorMap =
new HashMap<>();
collectorMap.put(appID, data);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 7a9f82f..61f1d75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
-
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index aafb8d7..32fda01 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -44,8 +44,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
index 499a5cb..dffd19e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -305,7 +305,7 @@ public class MockResourceManagerFacade implements
new ArrayList<ContainerStatus>(), containerList,
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
new ArrayList<NMToken>(), newAMRMToken,
- new ArrayList<UpdatedContainer>());
+ new ArrayList<UpdatedContainer>(), null);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 5fe90fc..57ef493 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -48,8 +48,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -640,9 +638,9 @@ public class ApplicationMasterService extends AbstractService implements
// add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
- AppCollectorData data = app.getCollectorData();
- if (data != null) {
- allocateResponse.setCollectorAddr(data.getCollectorAddr());
+ CollectorInfo collectorInfo = app.getCollectorInfo();
+ if (collectorInfo != null) {
+ allocateResponse.setCollectorInfo(collectorInfo);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 1a0b920..93c41b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -187,14 +188,24 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* only if the timeline service v.2 is enabled.
*
* @return the data for the application's collector, including collector
- * address, collector ID. Return null if the timeline service v.2 is not
- * enabled.
+ * address, RM ID, version and collector token. Return null if the timeline
+ * service v.2 is not enabled.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
AppCollectorData getCollectorData();
/**
+ * The timeline collector information to be sent to AM. It should be used
+ * only if the timeline service v.2 is enabled.
+ *
+ * @return collector info, including collector address and collector token.
+ * Return null if the timeline service v.2 is not enabled.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ CollectorInfo getCollectorInfo();
+ /**
* The original tracking url for the application master.
* @return the original tracking url for the application master.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index fe0237e..623f564 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -167,6 +168,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private int firstAttemptIdInStateStore = 1;
private int nextAttemptId = 1;
private AppCollectorData collectorData;
+ private CollectorInfo collectorInfo;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
private String queue;
@@ -528,7 +530,7 @@ public class RMAppImpl implements RMApp, Recoverable {
*/
public void startTimelineCollector() {
AppLevelTimelineCollector collector =
- new AppLevelTimelineCollector(applicationId);
+ new AppLevelTimelineCollector(applicationId, user);
rmContext.getRMTimelineCollectorManager().putIfAbsent(
applicationId, collector);
}
@@ -616,6 +618,12 @@ public class RMAppImpl implements RMApp, Recoverable {
public void setCollectorData(AppCollectorData incomingData) {
this.collectorData = incomingData;
+ this.collectorInfo = CollectorInfo.newInstance(
+ incomingData.getCollectorAddr(), incomingData.getCollectorToken());
+ }
+
+ public CollectorInfo getCollectorInfo() {
+ return this.collectorInfo;
}
public void removeCollectorData() {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org