You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/02/26 21:20:40 UTC
svn commit: r1572232 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoo...
Author: vinodkv
Date: Wed Feb 26 20:20:39 2014
New Revision: 1572232
URL: http://svn.apache.org/r1572232
Log:
YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of transferred containers from previous app-attempts to new AMs after YARN-1490. Contributed by Jian He.
svn merge --ignore-ancestry -c 1572230 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Feb 26 20:20:39 2014
@@ -131,6 +131,10 @@ Release 2.4.0 - UNRELEASED
YARN-1497. Command line additions for moving apps between queues (Sandy
Ryza)
+ YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of
+ transferred containers from previous app-attempts to new AMs after YARN-1490.
+ (Jian He via vinodkv)
+
IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java Wed Feb 26 20:20:39 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
@@ -55,13 +56,15 @@ public abstract class RegisterApplicatio
public static RegisterApplicationMasterResponse newInstance(
Resource minCapability, Resource maxCapability,
Map<ApplicationAccessType, String> acls, ByteBuffer key,
- List<Container> containersFromPreviousAttempt, String queue) {
+ List<Container> containersFromPreviousAttempt, String queue,
+ List<NMToken> nmTokensFromPreviousAttempts) {
RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(maxCapability);
response.setApplicationACLs(acls);
response.setClientToAMTokenMasterKey(key);
- response.setContainersFromPreviousAttempt(containersFromPreviousAttempt);
+ response.setContainersFromPreviousAttempts(containersFromPreviousAttempt);
+ response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts);
response.setQueue(queue);
return response;
}
@@ -129,26 +132,52 @@ public abstract class RegisterApplicatio
/**
* <p>
* Get the list of running containers as viewed by
- * <code>ResourceManager</code> from previous application attempt.
+ * <code>ResourceManager</code> from previous application attempts.
* </p>
*
* @return the list of running containers as viewed by
- * <code>ResourceManager</code> from previous application attempt
+ * <code>ResourceManager</code> from previous application attempts
+ * @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempts()
*/
@Public
@Unstable
- public abstract List<Container> getContainersFromPreviousAttempt();
+ public abstract List<Container> getContainersFromPreviousAttempts();
/**
* Set the list of running containers as viewed by
- * <code>ResourceManager</code> from previous application attempt.
+ * <code>ResourceManager</code> from previous application attempts.
*
* @param containersFromPreviousAttempt
* the list of running containers as viewed by
- * <code>ResourceManager</code> from previous application attempt.
+ * <code>ResourceManager</code> from previous application attempts.
*/
@Private
@Unstable
- public abstract void setContainersFromPreviousAttempt(
+ public abstract void setContainersFromPreviousAttempts(
List<Container> containersFromPreviousAttempt);
+
+ /**
+ * Get the list of NMTokens for communicating with the NMs where the
+ * containers of previous application attempts are running.
+ *
+ * @return the list of NMTokens for communicating with the NMs where the
+ * containers of previous application attempts are running.
+ *
+ * @see RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()
+ */
+ @Public
+ @Stable
+ public abstract List<NMToken> getNMTokensFromPreviousAttempts();
+
+ /**
+ * Set the list of NMTokens for communicating with the NMs where the the
+ * containers of previous application attempts are running.
+ *
+ * @param nmTokens
+ * the list of NMTokens for communicating with the NMs where the
+ * containers of previous application attempts are running.
+ */
+ @Private
+ @Unstable
+ public abstract void setNMTokensFromPreviousAttempts(List<NMToken> nmTokens);
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java Wed Feb 26 20:20:39 2014
@@ -72,4 +72,37 @@ public abstract class NMToken {
@Stable
public abstract void setToken(Token token);
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result =
+ prime * result + ((getNodeId() == null) ? 0 : getNodeId().hashCode());
+ result =
+ prime * result + ((getToken() == null) ? 0 : getToken().hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ NMToken other = (NMToken) obj;
+ if (getNodeId() == null) {
+ if (other.getNodeId() != null)
+ return false;
+ } else if (!getNodeId().equals(other.getNodeId()))
+ return false;
+ if (getToken() == null) {
+ if (other.getToken() != null)
+ return false;
+ } else if (!getToken().equals(other.getToken()))
+ return false;
+ return true;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Wed Feb 26 20:20:39 2014
@@ -44,8 +44,9 @@ message RegisterApplicationMasterRespons
optional ResourceProto maximumCapability = 1;
optional bytes client_to_am_token_master_key = 2;
repeated ApplicationACLMapProto application_ACLs = 3;
- repeated ContainerProto containers_from_previous_attempt = 4;
+ repeated ContainerProto containers_from_previous_attempts = 4;
optional string queue = 5;
+ repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
}
message FinishApplicationMasterRequestProto {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Wed Feb 26 20:20:39 2014
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -542,7 +543,7 @@ public class ApplicationMaster {
}
List<Container> previousAMRunningContainers =
- response.getContainersFromPreviousAttempt();
+ response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java Wed Feb 26 20:20:39 2014
@@ -195,6 +195,12 @@ public class AMRMClientImpl<T extends Co
appTrackingUrl);
RegisterApplicationMasterResponse response =
rmClient.registerApplicationMaster(request);
+
+ synchronized (this) {
+ if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
+ populateNMTokens(response.getNMTokensFromPreviousAttempts());
+ }
+ }
return response;
}
@@ -250,7 +256,7 @@ public class AMRMClientImpl<T extends Co
lastResponseId = allocateResponse.getResponseId();
clusterAvailableResources = allocateResponse.getAvailableResources();
if (!allocateResponse.getNMTokens().isEmpty()) {
- populateNMTokens(allocateResponse);
+ populateNMTokens(allocateResponse.getNMTokens());
}
}
} finally {
@@ -284,13 +290,17 @@ public class AMRMClientImpl<T extends Co
@Private
@VisibleForTesting
- protected void populateNMTokens(AllocateResponse allocateResponse) {
- for (NMToken token : allocateResponse.getNMTokens()) {
+ protected void populateNMTokens(List<NMToken> nmTokens) {
+ for (NMToken token : nmTokens) {
String nodeId = token.getNodeId().toString();
if (getNMTokenCache().containsToken(nodeId)) {
- LOG.debug("Replacing token for : " + nodeId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replacing token for : " + nodeId);
+ }
} else {
- LOG.debug("Received new token for : " + nodeId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received new token for : " + nodeId);
+ }
}
getNMTokenCache().setToken(nodeId, token.getToken());
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java Wed Feb 26 20:20:39 2014
@@ -31,13 +31,16 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
@@ -56,7 +59,8 @@ public class RegisterApplicationMasterRe
private Resource maximumResourceCapability;
private Map<ApplicationAccessType, String> applicationACLS = null;
- private List<Container> containersFromPreviousAttempt = null;
+ private List<Container> containersFromPreviousAttempts = null;
+ private List<NMToken> nmTokens = null;
public RegisterApplicationMasterResponsePBImpl() {
builder = RegisterApplicationMasterResponseProto.newBuilder();
@@ -110,8 +114,13 @@ public class RegisterApplicationMasterRe
if (this.applicationACLS != null) {
addApplicationACLs();
}
- if (this.containersFromPreviousAttempt != null) {
- addRunningContainersToProto();
+ if (this.containersFromPreviousAttempts != null) {
+ addContainersFromPreviousAttemptToProto();
+ }
+ if (nmTokens != null) {
+ builder.clearNmTokensFromPreviousAttempts();
+ Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
+ builder.addAllNmTokensFromPreviousAttempts(iterable);
}
}
@@ -236,21 +245,22 @@ public class RegisterApplicationMasterRe
}
@Override
- public List<Container> getContainersFromPreviousAttempt() {
- if (this.containersFromPreviousAttempt != null) {
- return this.containersFromPreviousAttempt;
+ public List<Container> getContainersFromPreviousAttempts() {
+ if (this.containersFromPreviousAttempts != null) {
+ return this.containersFromPreviousAttempts;
}
- initRunningContainersList();
- return this.containersFromPreviousAttempt;
+ initContainersPreviousAttemptList();
+ return this.containersFromPreviousAttempts;
}
@Override
- public void setContainersFromPreviousAttempt(final List<Container> containers) {
+ public void
+ setContainersFromPreviousAttempts(final List<Container> containers) {
if (containers == null) {
return;
}
- this.containersFromPreviousAttempt = new ArrayList<Container>();
- this.containersFromPreviousAttempt.addAll(containers);
+ this.containersFromPreviousAttempts = new ArrayList<Container>();
+ this.containersFromPreviousAttempts.addAll(containers);
}
@Override
@@ -272,25 +282,88 @@ public class RegisterApplicationMasterRe
}
}
- private void initRunningContainersList() {
- RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
- List<ContainerProto> list = p.getContainersFromPreviousAttemptList();
- containersFromPreviousAttempt = new ArrayList<Container>();
+
+ private void initContainersPreviousAttemptList() {
+ RegisterApplicationMasterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List<ContainerProto> list = p.getContainersFromPreviousAttemptsList();
+ containersFromPreviousAttempts = new ArrayList<Container>();
for (ContainerProto c : list) {
- containersFromPreviousAttempt.add(convertFromProtoFormat(c));
+ containersFromPreviousAttempts.add(convertFromProtoFormat(c));
}
}
- private void addRunningContainersToProto() {
+ private void addContainersFromPreviousAttemptToProto() {
maybeInitBuilder();
- builder.clearContainersFromPreviousAttempt();
+ builder.clearContainersFromPreviousAttempts();
List<ContainerProto> list = new ArrayList<ContainerProto>();
- for (Container c : containersFromPreviousAttempt) {
+ for (Container c : containersFromPreviousAttempts) {
list.add(convertToProtoFormat(c));
}
- builder.addAllContainersFromPreviousAttempt(list);
+ builder.addAllContainersFromPreviousAttempts(list);
+ }
+
+
+ @Override
+ public List<NMToken> getNMTokensFromPreviousAttempts() {
+ if (nmTokens != null) {
+ return nmTokens;
+ }
+ initLocalNewNMTokenList();
+ return nmTokens;
}
+ @Override
+ public void setNMTokensFromPreviousAttempts(final List<NMToken> nmTokens) {
+ if (nmTokens == null || nmTokens.isEmpty()) {
+ if (this.nmTokens != null) {
+ this.nmTokens.clear();
+ }
+ builder.clearNmTokensFromPreviousAttempts();
+ return;
+ }
+ this.nmTokens = new ArrayList<NMToken>();
+ this.nmTokens.addAll(nmTokens);
+ }
+
+ private synchronized void initLocalNewNMTokenList() {
+ RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<NMTokenProto> list = p.getNmTokensFromPreviousAttemptsList();
+ nmTokens = new ArrayList<NMToken>();
+ for (NMTokenProto t : list) {
+ nmTokens.add(convertFromProtoFormat(t));
+ }
+ }
+
+ private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
+ final List<NMToken> nmTokenList) {
+ maybeInitBuilder();
+ return new Iterable<NMTokenProto>() {
+ @Override
+ public synchronized Iterator<NMTokenProto> iterator() {
+ return new Iterator<NMTokenProto>() {
+
+ Iterator<NMToken> iter = nmTokenList.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public NMTokenProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
private Resource convertFromProtoFormat(ResourceProto resource) {
return new ResourcePBImpl(resource);
}
@@ -306,4 +379,12 @@ public class RegisterApplicationMasterRe
private ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl) t).getProto();
}
+
+ private NMTokenProto convertToProtoFormat(NMToken token) {
+ return ((NMTokenPBImpl) token).getProto();
+ }
+
+ private NMToken convertFromProtoFormat(NMTokenProto proto) {
+ return new NMTokenPBImpl(proto);
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java Wed Feb 26 20:20:39 2014
@@ -47,7 +47,7 @@ public class NMTokenPBImpl extends NMTok
this.proto = proto;
viaProto = true;
}
-
+
@Override
public synchronized NodeId getNodeId() {
NMTokenProtoOrBuilder p = viaProto ? proto : builder;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Feb 26 20:20:39 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
@@ -280,10 +282,32 @@ public class ApplicationMasterService ex
.getMasterKey(applicationAttemptId).getEncoded()));
}
- List<Container> containerList =
+ // For work-preserving AM restart, retrieve previous attempts' containers
+ // and corresponding NM tokens.
+ List<Container> transferredContainers =
((AbstractYarnScheduler) rScheduler)
.getTransferredContainers(applicationAttemptId);
- response.setContainersFromPreviousAttempt(containerList);
+ if (!transferredContainers.isEmpty()) {
+ response.setContainersFromPreviousAttempts(transferredContainers);
+ List<NMToken> nmTokens = new ArrayList<NMToken>();
+ for (Container container : transferredContainers) {
+ try {
+ nmTokens.add(rmContext.getNMTokenSecretManager()
+ .createAndGetNMToken(app.getUser(), applicationAttemptId,
+ container));
+ } catch (IllegalArgumentException e) {
+ // if it's a DNS issue, throw UnknowHostException directly and that
+ // will be automatically retried by RMProxy in RPC layer.
+ if (e.getCause() instanceof UnknownHostException) {
+ throw (UnknownHostException) e.getCause();
+ }
+ }
+ }
+ response.setNMTokensFromPreviousAttempts(nmTokens);
+ LOG.info("Application " + appID + " retrieved "
+ + transferredContainers.size() + " containers from previous"
+ + " attempts and " + nmTokens.size() + " NM tokens.");
+ }
return response;
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Wed Feb 26 20:20:39 2014
@@ -385,9 +385,8 @@ public class SchedulerApplicationAttempt
}
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.
- LOG.error(
- "Error trying to assign container token to allocated container "
- + container.getId(), e);
+ LOG.error("Error trying to assign container token and NM token to" +
+ " an allocated container " + container.getId(), e);
continue;
}
returnContainerList.add(container);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Feb 26 20:20:39 2014
@@ -486,6 +486,7 @@ public class MockRM extends ResourceMana
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
+ rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Wed Feb 26 20:20:39 2014
@@ -24,6 +24,7 @@ import java.util.List;
import junit.framework.Assert;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -160,11 +162,11 @@ public class TestAMRestart {
am2.registerAppAttempt();
// Assert two containers are running: container2 and container3;
- Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt()
+ Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempts()
.size());
boolean containerId2Exists = false, containerId3Exists = false;
for (Container container : registerResponse
- .getContainersFromPreviousAttempt()) {
+ .getContainersFromPreviousAttempts()) {
if (container.getId().equals(containerId2)) {
containerId2Exists = true;
}
@@ -232,4 +234,100 @@ public class TestAMRestart {
rm1.stop();
}
+
+ @Test
+ public void testNMTokensRebindOnAMRestart() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
+
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ RMApp app1 =
+ rm1.submitApp(200, "myname", "myuser",
+ new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+ null, "MAPREDUCE", false, true);
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ MockNM nm2 =
+ new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService());
+ nm2.registerNode();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ int NUM_CONTAINERS = 1;
+ List<Container> containers = new ArrayList<Container>();
+ // nmTokens keeps track of all the nmTokens issued in the allocate call.
+ List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
+
+ // am1 allocate 1 container on nm1.
+ while (true) {
+ AllocateResponse response =
+ am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
+ new ArrayList<ContainerId>());
+ nm1.nodeHeartbeat(true);
+ containers.addAll(response.getAllocatedContainers());
+ expectedNMTokens.addAll(response.getNMTokens());
+ if (containers.size() == NUM_CONTAINERS) {
+ break;
+ }
+ Thread.sleep(200);
+ System.out.println("Waiting for container to be allocated.");
+ }
+ // launch the container
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+ ContainerId containerId2 =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+
+ // fail am1
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am1.waitForState(RMAppAttemptState.FAILED);
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // restart the am
+ MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
+ RegisterApplicationMasterResponse registerResponse =
+ am2.registerAppAttempt();
+ rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+ // check am2 get the nm token from am1.
+ Assert.assertEquals(expectedNMTokens,
+ registerResponse.getNMTokensFromPreviousAttempts());
+
+ // am2 allocate 1 container on nm2
+ containers = new ArrayList<Container>();
+ while (true) {
+ AllocateResponse allocateResponse =
+ am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS,
+ new ArrayList<ContainerId>());
+ nm2.nodeHeartbeat(true);
+ containers.addAll(allocateResponse.getAllocatedContainers());
+ expectedNMTokens.addAll(allocateResponse.getNMTokens());
+ if (containers.size() == NUM_CONTAINERS) {
+ break;
+ }
+ Thread.sleep(200);
+ System.out.println("Waiting for container to be allocated.");
+ }
+ nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+ ContainerId am2ContainerId2 =
+ ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+ rm1.waitForState(nm1, am2ContainerId2, RMContainerState.RUNNING);
+
+ // fail am2.
+ nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am2.waitForState(RMAppAttemptState.FAILED);
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // restart am
+ MockAM am3 = MockRM.launchAM(app1, rm1, nm1);
+ registerResponse = am3.registerAppAttempt();
+ rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+ // check am3 get the NM token from both am1 and am2;
+ List<NMToken> transferredTokens = registerResponse.getNMTokensFromPreviousAttempts();
+ Assert.assertEquals(2, transferredTokens.size());
+ Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
+ rm1.stop();
+ }
}