You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/16 05:12:32 UTC
svn commit: r1493450 - in
/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protoco...
Author: vinodkv
Date: Sun Jun 16 03:12:31 2013
New Revision: 1493450
URL: http://svn.apache.org/r1493450
Log:
YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then use them for authentication with NMs. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1493448 ../../trunk/
Added:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java
- copied unchanged from r1493448, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java
- copied unchanged from r1493448, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt Sun Jun 16 03:12:31 2013
@@ -314,6 +314,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-639. Modified Distributed Shell application to start using the new
NMClient library. (Zhijie Shen via vinodkv)
+ YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then
+ use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java Sun Jun 16 03:12:31 2013
@@ -29,10 +29,10 @@ import org.apache.hadoop.yarn.api.AMRMPr
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -67,7 +67,7 @@ public abstract class AllocateResponse {
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
- PreemptionMessage preempt) {
+ PreemptionMessage preempt, List<NMToken> nmTokens) {
AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
response.setResponseId(responseId);
@@ -77,6 +77,7 @@ public abstract class AllocateResponse {
response.setAvailableResources(availResources);
response.setAMCommand(command);
response.setPreemptionMessage(preempt);
+ response.setNMTokens(nmTokens);
return response;
}
@@ -202,7 +203,7 @@ public abstract class AllocateResponse {
@Public
@Stable
- public abstract void setNMTokens(List<Token> nmTokens);
+ public abstract void setNMTokens(List<NMToken> nmTokens);
/**
* Get the list of NMTokens required for communicating with NM. New NMTokens
@@ -217,6 +218,6 @@ public abstract class AllocateResponse {
*/
@Public
@Stable
- public abstract List<Token> getNMTokens();
+ public abstract List<NMToken> getNMTokens();
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java Sun Jun 16 03:12:31 2013
@@ -23,21 +23,20 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
@@ -46,6 +45,7 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
public class AllocateResponsePBImpl extends AllocateResponse {
@@ -56,7 +56,7 @@ public class AllocateResponsePBImpl exte
Resource limit;
private List<Container> allocatedContainers = null;
- private List<Token> nmTokens = null;
+ private List<NMToken> nmTokens = null;
private List<ContainerStatus> completedContainersStatuses = null;
private List<NodeReport> updatedNodes = null;
@@ -108,7 +108,7 @@ public class AllocateResponsePBImpl exte
}
if (nmTokens != null) {
builder.clearNmTokens();
- Iterable<TokenProto> iterable = getTokenProtoIterable(nmTokens);
+ Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
builder.addAllNmTokens(iterable);
}
if (this.completedContainersStatuses != null) {
@@ -248,9 +248,11 @@ public class AllocateResponsePBImpl exte
}
@Override
- public synchronized void setNMTokens(List<Token> nmTokens) {
+ public synchronized void setNMTokens(List<NMToken> nmTokens) {
if (nmTokens == null || nmTokens.isEmpty()) {
- this.nmTokens.clear();
+ if (this.nmTokens != null) {
+ this.nmTokens.clear();
+ }
builder.clearNmTokens();
return;
}
@@ -260,7 +262,7 @@ public class AllocateResponsePBImpl exte
}
@Override
- public synchronized List<Token> getNMTokens() {
+ public synchronized List<NMToken> getNMTokens() {
initLocalNewNMTokenList();
return nmTokens;
}
@@ -334,9 +336,9 @@ public class AllocateResponsePBImpl exte
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
- List<TokenProto> list = p.getNmTokensList();
- nmTokens = new ArrayList<Token>();
- for (TokenProto t : list) {
+ List<NMTokenProto> list = p.getNmTokensList();
+ nmTokens = new ArrayList<NMToken>();
+ for (NMTokenProto t : list) {
nmTokens.add(convertFromProtoFormat(t));
}
}
@@ -372,15 +374,15 @@ public class AllocateResponsePBImpl exte
};
}
- private synchronized Iterable<TokenProto> getTokenProtoIterable(
- final List<Token> nmTokenList) {
+ private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
+ final List<NMToken> nmTokenList) {
maybeInitBuilder();
- return new Iterable<TokenProto>() {
+ return new Iterable<NMTokenProto>() {
@Override
- public synchronized Iterator<TokenProto> iterator() {
- return new Iterator<TokenProto>() {
+ public synchronized Iterator<NMTokenProto> iterator() {
+ return new Iterator<NMTokenProto>() {
- Iterator<Token> iter = nmTokenList.iterator();
+ Iterator<NMToken> iter = nmTokenList.iterator();
@Override
public boolean hasNext() {
@@ -388,7 +390,7 @@ public class AllocateResponsePBImpl exte
}
@Override
- public TokenProto next() {
+ public NMTokenProto next() {
return convertToProtoFormat(iter.next());
}
@@ -524,11 +526,11 @@ public class AllocateResponsePBImpl exte
return ((PreemptionMessagePBImpl)r).getProto();
}
- private synchronized TokenProto convertToProtoFormat(Token token) {
- return ((TokenPBImpl)token).getProto();
+ private synchronized NMTokenProto convertToProtoFormat(NMToken token) {
+ return ((NMTokenPBImpl)token).getProto();
}
- private synchronized Token convertFromProtoFormat(TokenProto proto) {
- return new TokenPBImpl(proto);
+ private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) {
+ return new NMTokenPBImpl(proto);
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Sun Jun 16 03:12:31 2013
@@ -59,6 +59,11 @@ message AllocateRequestProto {
optional float progress = 6;
}
+message NMTokenProto {
+ optional NodeIdProto nodeId = 1;
+ optional hadoop.common.TokenProto token = 2;
+}
+
message AllocateResponseProto {
optional AMCommandProto a_m_command = 1;
optional int32 response_id = 2;
@@ -68,7 +73,7 @@ message AllocateResponseProto {
repeated NodeReportProto updated_nodes = 6;
optional int32 num_cluster_nodes = 7;
optional PreemptionMessageProto preempt = 8;
- repeated hadoop.common.TokenProto nm_tokens = 9;
+ repeated NMTokenProto nm_tokens = 9;
}
//////////////////////////////////////////////////////
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java Sun Jun 16 03:12:31 2013
@@ -21,15 +21,17 @@ package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.Service;
@@ -208,4 +210,13 @@ public interface AMRMClient<T extends AM
String resourceName,
Resource capability);
+ /**
+ * It returns the NMToken received on allocate call. It will not communicate
+ * with RM to get NMTokens. On allocate call whenever we receive new token
+ * along with container AMRMClient will cache this NMToken per node manager.
+ * This map returned should be shared with any application which is
+ * communicating with NodeManager (ex. NMClient) using NMTokens. If a new
+ * NMToken is received for the same node manager then it will be replaced.
+ */
+ public ConcurrentMap<String, Token> getNMTokens();
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java Sun Jun 16 03:12:31 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -260,6 +262,19 @@ public class AMRMClientAsync<T extends C
public int getClusterNodeCount() {
return client.getClusterNodeCount();
}
+
+ /**
+ * It returns the NMToken received on allocate call. It will not communicate
+ * with RM to get NMTokens. On allocate call whenever we receive new token
+ * along with new container AMRMClientAsync will cache this NMToken per node
+ * manager. This map returned should be shared with any application which is
+ * communicating with NodeManager (ex. NMClient / NMClientAsync) using
+ * NMTokens. If a new NMToken is received for the same node manager
+ * then it will be replaced.
+ */
+ public ConcurrentMap<String, Token> getNMTokens() {
+ return client.getNMTokens();
+ }
private class HeartbeatThread extends Thread {
public HeartbeatThread() {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Sun Jun 16 03:12:31 2013
@@ -33,9 +33,11 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
@@ -49,9 +51,11 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -61,6 +65,8 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
+import com.google.common.annotations.VisibleForTesting;
+
// TODO check inputs for null etc. YARN-654
@Unstable
@@ -73,6 +79,7 @@ public class AMRMClientImpl<T extends Co
RecordFactoryProvider.getRecordFactory(null);
private int lastResponseId = 0;
+ private ConcurrentHashMap<String, Token> nmTokens;
protected AMRMProtocol rmClient;
protected final ApplicationAttemptId appAttemptId;
@@ -148,6 +155,7 @@ public class AMRMClientImpl<T extends Co
public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
super(AMRMClientImpl.class.getName());
this.appAttemptId = appAttemptId;
+ this.nmTokens = new ConcurrentHashMap<String, Token>();
}
@Override
@@ -238,6 +246,9 @@ public class AMRMClientImpl<T extends Co
clusterNodeCount = allocateResponse.getNumClusterNodes();
lastResponseId = allocateResponse.getResponseId();
clusterAvailableResources = allocateResponse.getAvailableResources();
+ if (!allocateResponse.getNMTokens().isEmpty()) {
+ populateNMTokens(allocateResponse);
+ }
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
@@ -265,6 +276,20 @@ public class AMRMClientImpl<T extends Co
return allocateResponse;
}
+ @Private
+ @VisibleForTesting
+ protected void populateNMTokens(AllocateResponse allocateResponse) {
+ for (NMToken token : allocateResponse.getNMTokens()) {
+ String nodeId = token.getNodeId().toString();
+ if (nmTokens.containsKey(nodeId)) {
+ LOG.debug("Replacing token for : " + nodeId);
+ } else {
+ LOG.debug("Received new token for : " + nodeId);
+ }
+ nmTokens.put(nodeId, token.getToken());
+ }
+ }
+
@Override
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnException,
@@ -512,4 +537,8 @@ public class AMRMClientImpl<T extends Co
}
}
+ @Override
+ public ConcurrentHashMap<String, Token> getNMTokens() {
+ return nmTokens;
+ }
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Sun Jun 16 03:12:31 2013
@@ -25,9 +25,14 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.AMRMProtocol;
@@ -49,6 +54,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
@@ -437,6 +443,11 @@ public class TestAMRMClient {
int allocatedContainerCount = 0;
int iterationsLeft = 2;
Set<ContainerId> releases = new TreeSet<ContainerId>();
+
+ ConcurrentHashMap<String, Token> nmTokens = amClient.getNMTokens();
+ Assert.assertEquals(0, nmTokens.size());
+ HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
+
while (allocatedContainerCount < containersRequestedAny
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
@@ -450,12 +461,32 @@ public class TestAMRMClient {
releases.add(rejectContainerId);
amClient.releaseAssignedContainer(rejectContainerId);
}
+ Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
+ Iterator<String> nodeI = nmTokens.keySet().iterator();
+ while (nodeI.hasNext()) {
+ String nodeId = nodeI.next();
+ if (!receivedNMTokens.containsKey(nodeId)) {
+ receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
+ } else {
+ Assert.fail("Received token again for : " + nodeId);
+ }
+ }
+ nodeI = receivedNMTokens.keySet().iterator();
+ while (nodeI.hasNext()) {
+ nmTokens.remove(nodeI.next());
+ }
+
if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000);
}
}
-
+
+ Assert.assertEquals(0, amClient.getNMTokens().size());
+ // Should receive atleast 1 token
+ Assert.assertTrue(receivedNMTokens.size() > 0
+ && receivedNMTokens.size() <= nodeCount);
+
assertTrue(allocatedContainerCount == containersRequestedAny);
assertTrue(amClient.release.size() == 2);
assertTrue(amClient.ask.size() == 0);
@@ -523,7 +554,6 @@ public class TestAMRMClient {
sleep(1000);
}
}
-
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java Sun Jun 16 03:12:31 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
@@ -66,11 +67,11 @@ public class TestAMRMClientAsync {
List<Container> allocated1 = Arrays.asList(
Container.newInstance(null, null, null, null, null, null));
final AllocateResponse response1 = createAllocateResponse(
- new ArrayList<ContainerStatus>(), allocated1);
+ new ArrayList<ContainerStatus>(), allocated1, null);
final AllocateResponse response2 = createAllocateResponse(completed1,
- new ArrayList<Container>());
+ new ArrayList<Container>(), null);
final AllocateResponse emptyResponse = createAllocateResponse(
- new ArrayList<ContainerStatus>(), new ArrayList<Container>());
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
TestCallbackHandler callbackHandler = new TestCallbackHandler();
final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
@@ -146,7 +147,7 @@ public class TestAMRMClientAsync {
Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
}
-
+
@Test(timeout=10000)
public void testAMRMClientAsyncException() throws Exception {
Configuration conf = new Configuration();
@@ -189,7 +190,7 @@ public class TestAMRMClientAsync {
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
final AllocateResponse rebootResponse = createAllocateResponse(
- new ArrayList<ContainerStatus>(), new ArrayList<Container>());
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
when(client.allocate(anyFloat())).thenReturn(rebootResponse);
@@ -215,9 +216,11 @@ public class TestAMRMClientAsync {
}
private AllocateResponse createAllocateResponse(
- List<ContainerStatus> completed, List<Container> allocated) {
- AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated,
- new ArrayList<NodeReport>(), null, null, 1, null);
+ List<ContainerStatus> completed, List<Container> allocated,
+ List<NMToken> nmTokens) {
+ AllocateResponse response =
+ AllocateResponse.newInstance(0, completed, allocated,
+ new ArrayList<NodeReport>(), null, null, 1, null, nmTokens);
return response;
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java Sun Jun 16 03:12:31 2013
@@ -21,15 +21,19 @@ package org.apache.hadoop.yarn.security;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
public class NMTokenIdentifier extends TokenIdentifier {
@@ -106,5 +110,4 @@ public class NMTokenIdentifier extends T
public UserGroupInformation getUser() {
return UserGroupInformation.createRemoteUser(appAttemptId.toString());
}
-
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java Sun Jun 16 03:12:31 2013
@@ -18,14 +18,11 @@
package org.apache.hadoop.yarn.server.security;
-import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.crypto.SecretKey;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -34,7 +31,6 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
-import org.apache.hadoop.yarn.util.Records;
/**
* SecretManager for ContainerTokens. Extended by both RM and NM and hence is
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java Sun Jun 16 03:12:31 2013
@@ -114,39 +114,4 @@ public class BaseNMTokenSecretManager ex
public NMTokenIdentifier createIdentifier() {
return new NMTokenIdentifier();
}
-
- /**
- * Helper function for creating NMTokens.
- */
- public Token createNMToken(ApplicationAttemptId applicationAttemptId,
- NodeId nodeId, String applicationSubmitter) {
- byte[] password;
- NMTokenIdentifier identifier;
-
- this.readLock.lock();
- try {
- identifier =
- new NMTokenIdentifier(applicationAttemptId, nodeId,
- applicationSubmitter, this.currentMasterKey.getMasterKey()
- .getKeyId());
- password = this.createPassword(identifier);
- } finally {
- this.readLock.unlock();
- }
- return newNMToken(password, identifier);
- }
-
- public static Token newNMToken(byte[] password,
- NMTokenIdentifier identifier) {
- NodeId nodeId = identifier.getNodeId();
- // RPC layer client expects ip:port as service for tokens
- InetSocketAddress addr =
- NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
- Token nmToken =
- Token.newInstance(identifier.getBytes(),
- NMTokenIdentifier.KIND.toString(), password, SecurityUtil
- .buildTokenService(addr).toString());
- return nmToken;
-
- }
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Sun Jun 16 03:12:31 2013
@@ -373,6 +373,12 @@ public class ApplicationMasterService ex
// add preemption to the allocateResponse message (if any)
allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
+ // Adding NMTokens for allocated containers.
+ if (!allocation.getContainers().isEmpty()) {
+ allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
+ .getNMTokens(app.getUser(), appAttemptId,
+ allocation.getContainers()));
+ }
return allocateResponse;
}
}
@@ -433,12 +439,15 @@ public class ApplicationMasterService ex
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
response.setResponseId(0);
- LOG.info("Registering " + attemptId);
+ LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, response);
+ rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
}
public void unregisterAttempt(ApplicationAttemptId attemptId) {
+ LOG.info("Unregistering app attempt : " + attemptId);
responseMap.remove(attemptId);
+ rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId);
}
public void refreshServiceAcls(Configuration configuration,
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Sun Jun 16 03:12:31 2013
@@ -218,7 +218,9 @@ public class ResourceTrackerService exte
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeReconnectEvent(nodeId, rmNode));
}
-
+ // On every node manager register we will be clearing NMToken keys if
+ // present for any running application.
+ this.nmTokenSecretManager.removeNodeKey(nodeId);
this.nmLivelinessMonitor.register(nodeId);
String message =
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java Sun Jun 16 03:12:31 2013
@@ -18,18 +18,35 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import com.google.common.annotations.VisibleForTesting;
+
public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
@@ -42,6 +59,7 @@ public class NMTokenSecretManagerInRM ex
private final Timer timer;
private final long rollingInterval;
private final long activationDelay;
+ private final ConcurrentHashMap<ApplicationAttemptId, HashSet<NodeId>> appAttemptToNodeKeyMap;
public NMTokenSecretManagerInRM(Configuration conf) {
this.conf = conf;
@@ -70,6 +88,8 @@ public class NMTokenSecretManagerInRM ex
+ " should be more than 2 X "
+ YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS);
}
+ appAttemptToNodeKeyMap =
+ new ConcurrentHashMap<ApplicationAttemptId, HashSet<NodeId>>();
}
/**
@@ -119,11 +139,23 @@ public class NMTokenSecretManagerInRM ex
+ this.nextMasterKey.getMasterKey().getKeyId());
this.currentMasterKey = this.nextMasterKey;
this.nextMasterKey = null;
+ clearApplicationNMTokenKeys();
} finally {
super.writeLock.unlock();
}
}
+ private void clearApplicationNMTokenKeys() {
+ // We should clear all node entries from this set.
+ // TODO : Once we have per node master key then it will change to only
+ // remove specific node from it.
+ Iterator<HashSet<NodeId>> nodeSetI =
+ this.appAttemptToNodeKeyMap.values().iterator();
+ while (nodeSetI.hasNext()) {
+ nodeSetI.next().clear();
+ }
+ }
+
public void start() {
rollMasterKey();
this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
@@ -150,4 +182,129 @@ public class NMTokenSecretManagerInRM ex
activateNextMasterKey();
}
}
+
+ public List<NMToken> getNMTokens(String applicationSubmitter,
+ ApplicationAttemptId appAttemptId, List<Container> containers) {
+ try {
+ this.readLock.lock();
+ List<NMToken> nmTokens = new ArrayList<NMToken>();
+ HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId);
+ if (nodeSet != null) {
+ for (Container container : containers) {
+ if (!nodeSet.contains(container.getNodeId())) {
+ LOG.debug("Sending NMToken for nodeId : "
+ + container.getNodeId().toString());
+ Token token = createNMToken(appAttemptId, container.getNodeId(),
+ applicationSubmitter);
+ NMToken nmToken =
+ NMToken.newInstance(container.getNodeId(), token);
+ nmTokens.add(nmToken);
+ nodeSet.add(container.getNodeId());
+ }
+ }
+ }
+ return nmTokens;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) {
+ try {
+ this.writeLock.lock();
+ this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet<NodeId>());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public boolean isApplicationAttemptRegistered(
+ ApplicationAttemptId appAttemptId) {
+ try {
+ this.readLock.lock();
+ return this.appAttemptToNodeKeyMap.containsKey(appAttemptId);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public boolean isApplicationAttemptNMTokenPresent(
+ ApplicationAttemptId appAttemptId, NodeId nodeId) {
+ try {
+ this.readLock.lock();
+ HashSet<NodeId> nodes = this.appAttemptToNodeKeyMap.get(appAttemptId);
+ if (nodes != null && nodes.contains(nodeId)) {
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) {
+ try {
+ this.writeLock.lock();
+ this.appAttemptToNodeKeyMap.remove(appAttemptId);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /**
+ * This is to be called when NodeManager reconnects or goes down. This will
+ * remove if NMTokens if present for any running application from cache.
+ * @param nodeId
+ */
+ public void removeNodeKey(NodeId nodeId) {
+ try {
+ this.writeLock.lock();
+ Iterator<HashSet<NodeId>> appNodeKeySetIterator =
+ this.appAttemptToNodeKeyMap.values().iterator();
+ while (appNodeKeySetIterator.hasNext()) {
+ appNodeKeySetIterator.next().remove(nodeId);
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public static Token newNMToken(byte[] password,
+ NMTokenIdentifier identifier) {
+ NodeId nodeId = identifier.getNodeId();
+ // RPC layer client expects ip:port as service for tokens
+ InetSocketAddress addr =
+ NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+ Token nmToken =
+ Token.newInstance(identifier.getBytes(),
+ NMTokenIdentifier.KIND.toString(), password, SecurityUtil
+ .buildTokenService(addr).toString());
+ return nmToken;
+ }
+
+ /**
+ * Helper function for creating NMTokens.
+ */
+ public Token createNMToken(ApplicationAttemptId applicationAttemptId,
+ NodeId nodeId, String applicationSubmitter) {
+ byte[] password;
+ NMTokenIdentifier identifier;
+
+ this.readLock.lock();
+ try {
+ identifier =
+ new NMTokenIdentifier(applicationAttemptId, nodeId,
+ applicationSubmitter, this.currentMasterKey.getMasterKey()
+ .getKeyId());
+ password = this.createPassword(identifier);
+ } finally {
+ this.readLock.unlock();
+ }
+ return newNMToken(password, identifier);
+ }
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Sun Jun 16 03:12:31 2013
@@ -65,12 +65,12 @@ public class MockAM {
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
int timeoutSecs = 0;
while (!finalState.equals(attempt.getAppAttemptState())
- && timeoutSecs++ < 20) {
+ && timeoutSecs++ < 40) {
System.out
.println("AppAttempt : " + attemptId + " State is : "
+ attempt.getAppAttemptState()
+ " Waiting for state : " + finalState);
- Thread.sleep(500);
+ Thread.sleep(1000);
}
System.out.println("AppAttempt State is : " + attempt.getAppAttemptState());
Assert.assertEquals("AppAttempt state is not correct (timedout)",
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Sun Jun 16 03:12:31 2013
@@ -19,22 +19,27 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -136,6 +141,184 @@ public class TestRM {
rm.stop();
}
+
+ @Test
+ public void testNMToken() throws Exception {
+ MockRM rm = new MockRM();
+ try {
+ rm.start();
+ MockNM nm1 = rm.registerNode("h1:1234", 10000);
+
+ NMTokenSecretManagerInRM nmTokenSecretManager =
+ rm.getRMContext().getNMTokenSecretManager();
+
+ // submitting new application
+ RMApp app = rm.submitApp(1000);
+
+ // start scheduling.
+ nm1.nodeHeartbeat(true);
+
+ // Starting application attempt and launching
+ // It should get registered with NMTokenSecretManager.
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+
+ Assert.assertTrue(nmTokenSecretManager
+ .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+
+ // This will register application master.
+ am.registerAppAttempt();
+
+ ArrayList<Container> containersReceivedForNM1 =
+ new ArrayList<Container>();
+ List<ContainerId> releaseContainerList =
+ new ArrayList<ContainerId>();
+ HashMap<String, Token> nmTokens = new HashMap<String, Token>();
+
+ // initially requesting 2 containers.
+ AllocateResponse response =
+ am.allocate("h1", 1000, 2, releaseContainerList);
+ nm1.nodeHeartbeat(true);
+ Assert.assertEquals(0, response.getAllocatedContainers().size());
+ allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2,
+ nmTokens);
+ Assert.assertEquals(1, nmTokens.size());
+
+
+ // requesting 2 more containers.
+ response = am.allocate("h1", 1000, 2, releaseContainerList);
+ nm1.nodeHeartbeat(true);
+ Assert.assertEquals(0, response.getAllocatedContainers().size());
+ allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4,
+ nmTokens);
+ Assert.assertEquals(1, nmTokens.size());
+
+
+ // We will be simulating NM restart so restarting newly added h2:1234
+ // NM 2 now registers.
+ MockNM nm2 = rm.registerNode("h2:1234", 10000);
+ nm2.nodeHeartbeat(true);
+ ArrayList<Container> containersReceivedForNM2 =
+ new ArrayList<Container>();
+
+ response = am.allocate("h2", 1000, 2, releaseContainerList);
+ nm2.nodeHeartbeat(true);
+ Assert.assertEquals(0, response.getAllocatedContainers().size());
+ allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2,
+ nmTokens);
+ Assert.assertEquals(2, nmTokens.size());
+
+ // Simulating NM-2 restart.
+ nm2 = rm.registerNode("h2:1234", 10000);
+ nm2.nodeHeartbeat(true);
+
+ int interval = 40;
+ // Wait for nm Token to be cleared.
+ while (nmTokenSecretManager
+ .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+ nm2.getNodeId()) && interval-- > 0) {
+ LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId());
+ Thread.sleep(1000);
+ }
+ Assert.assertTrue(nmTokenSecretManager
+ .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+
+ // removing NMToken for h2:1234
+ nmTokens.remove(nm2.getNodeId().toString());
+ Assert.assertEquals(1, nmTokens.size());
+
+ // We should again receive the NMToken.
+ response = am.allocate("h2", 1000, 2, releaseContainerList);
+ nm2.nodeHeartbeat(true);
+ Assert.assertEquals(0, response.getAllocatedContainers().size());
+ allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4,
+ nmTokens);
+ Assert.assertEquals(2, nmTokens.size());
+
+ // Now rolling over NMToken masterKey. it should resend the NMToken in
+ // next allocate call.
+ Assert.assertTrue(nmTokenSecretManager
+ .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+ nm1.getNodeId()));
+ Assert.assertTrue(nmTokenSecretManager
+ .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+ nm2.getNodeId()));
+
+ nmTokenSecretManager.rollMasterKey();
+ nmTokenSecretManager.activateNextMasterKey();
+
+ Assert.assertFalse(nmTokenSecretManager
+ .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+ nm1.getNodeId()));
+ Assert.assertFalse(nmTokenSecretManager
+ .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+ nm2.getNodeId()));
+ // It should not remove application attempt entry.
+ Assert.assertTrue(nmTokenSecretManager
+ .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+
+ nmTokens.clear();
+ Assert.assertEquals(0, nmTokens.size());
+ // We should again receive the NMToken.
+ response = am.allocate("h2", 1000, 1, releaseContainerList);
+ nm2.nodeHeartbeat(true);
+ Assert.assertEquals(0, response.getAllocatedContainers().size());
+ allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5,
+ nmTokens);
+ Assert.assertEquals(1, nmTokens.size());
+ Assert.assertTrue(nmTokenSecretManager
+ .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+ nm2.getNodeId()));
+
+
+ // After AM is finished making sure that nmtoken entry for app
+ Assert.assertTrue(nmTokenSecretManager
+ .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+ am.unregisterAppAttempt();
+ // marking all the containers as finished.
+ for (Container container : containersReceivedForNM1) {
+ nm1.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
+ ContainerState.COMPLETE);
+ }
+ for (Container container : containersReceivedForNM2) {
+ nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
+ ContainerState.COMPLETE);
+ }
+ am.waitForState(RMAppAttemptState.FINISHED);
+ Assert.assertFalse(nmTokenSecretManager
+ .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+ } finally {
+ rm.stop();
+ }
+ }
+
+ protected void allocateContainersAndValidateNMTokens(MockAM am,
+ ArrayList<Container> containersReceived, int totalContainerRequested,
+ HashMap<String, Token> nmTokens) throws Exception, InterruptedException {
+ ArrayList<ContainerId> releaseContainerList = new ArrayList<ContainerId>();
+ AllocateResponse response;
+ ArrayList<ResourceRequest> resourceRequest =
+ new ArrayList<ResourceRequest>();
+ while (containersReceived.size() < totalContainerRequested) {
+ LOG.info("requesting containers..");
+ response =
+ am.allocate(resourceRequest, releaseContainerList);
+ containersReceived.addAll(response.getAllocatedContainers());
+ if (!response.getNMTokens().isEmpty()) {
+ for (NMToken nmToken : response.getNMTokens()) {
+ String nodeId = nmToken.getNodeId().toString();
+ if (nmTokens.containsKey(nodeId)) {
+ Assert.fail("Duplicate NMToken received for : " + nodeId);
+ }
+ nmTokens.put(nodeId, nmToken.getToken());
+ }
+ }
+ LOG.info("Got " + containersReceived.size()
+ + " containers. Waiting to get " + totalContainerRequested);
+ Thread.sleep(500);
+ }
+ }
@Test (timeout = 300000)
public void testActivatingApplicationAfterAddingNM() throws Exception {