You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/02/26 21:20:40 UTC

svn commit: r1572232 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoo...

Author: vinodkv
Date: Wed Feb 26 20:20:39 2014
New Revision: 1572232

URL: http://svn.apache.org/r1572232
Log:
YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of transferred containers from previous app-attempts to new AMs after YARN-1490. Contributed by Jian He.
svn merge --ignore-ancestry -c 1572230 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Feb 26 20:20:39 2014
@@ -131,6 +131,10 @@ Release 2.4.0 - UNRELEASED
     YARN-1497. Command line additions for moving apps between queues (Sandy
     Ryza)
 
+    YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of
+    transferred containers from previous app-attempts to new AMs after YARN-1490.
+    (Jian He via vinodkv)
+
   IMPROVEMENTS
 
     YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java Wed Feb 26 20:20:39 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -55,13 +56,15 @@ public abstract class RegisterApplicatio
   public static RegisterApplicationMasterResponse newInstance(
       Resource minCapability, Resource maxCapability,
       Map<ApplicationAccessType, String> acls, ByteBuffer key,
-      List<Container> containersFromPreviousAttempt, String queue) {
+      List<Container> containersFromPreviousAttempt, String queue,
+      List<NMToken> nmTokensFromPreviousAttempts) {
     RegisterApplicationMasterResponse response =
         Records.newRecord(RegisterApplicationMasterResponse.class);
     response.setMaximumResourceCapability(maxCapability);
     response.setApplicationACLs(acls);
     response.setClientToAMTokenMasterKey(key);
-    response.setContainersFromPreviousAttempt(containersFromPreviousAttempt);
+    response.setContainersFromPreviousAttempts(containersFromPreviousAttempt);
+    response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts);
     response.setQueue(queue);
     return response;
   }
@@ -129,26 +132,52 @@ public abstract class RegisterApplicatio
   /**
    * <p>
    * Get the list of running containers as viewed by
-   * <code>ResourceManager</code> from previous application attempt.
+   * <code>ResourceManager</code> from previous application attempts.
    * </p>
    * 
    * @return the list of running containers as viewed by
-   *         <code>ResourceManager</code> from previous application attempt
+   *         <code>ResourceManager</code> from previous application attempts
+   * @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempts()
    */
   @Public
   @Unstable
-  public abstract List<Container> getContainersFromPreviousAttempt();
+  public abstract List<Container> getContainersFromPreviousAttempts();
 
   /**
    * Set the list of running containers as viewed by
-   * <code>ResourceManager</code> from previous application attempt.
+   * <code>ResourceManager</code> from previous application attempts.
    * 
    * @param containersFromPreviousAttempt
    *          the list of running containers as viewed by
-   *          <code>ResourceManager</code> from previous application attempt.
+   *          <code>ResourceManager</code> from previous application attempts.
    */
   @Private
   @Unstable
-  public abstract void setContainersFromPreviousAttempt(
+  public abstract void setContainersFromPreviousAttempts(
       List<Container> containersFromPreviousAttempt);
+
+  /**
+   * Get the list of NMTokens for communicating with the NMs where the
+   * containers of previous application attempts are running.
+   * 
+   * @return the list of NMTokens for communicating with the NMs where the
+   *         containers of previous application attempts are running.
+   * 
+   * @see RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()
+   */
+  @Public
+  @Stable
+  public abstract List<NMToken> getNMTokensFromPreviousAttempts();
+
+  /**
+   * Set the list of NMTokens for communicating with the NMs where the the
+   * containers of previous application attempts are running.
+   * 
+   * @param nmTokens
+   *          the list of NMTokens for communicating with the NMs where the
+   *          containers of previous application attempts are running.
+   */
+  @Private
+  @Unstable
+  public abstract void setNMTokensFromPreviousAttempts(List<NMToken> nmTokens);
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java Wed Feb 26 20:20:39 2014
@@ -72,4 +72,37 @@ public abstract class NMToken {
   @Stable
   public abstract void setToken(Token token);
 
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result =
+        prime * result + ((getNodeId() == null) ? 0 : getNodeId().hashCode());
+    result =
+        prime * result + ((getToken() == null) ? 0 : getToken().hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    NMToken other = (NMToken) obj;
+    if (getNodeId() == null) {
+      if (other.getNodeId() != null)
+        return false;
+    } else if (!getNodeId().equals(other.getNodeId()))
+      return false;
+    if (getToken() == null) {
+      if (other.getToken() != null)
+        return false;
+    } else if (!getToken().equals(other.getToken()))
+      return false;
+    return true;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Wed Feb 26 20:20:39 2014
@@ -44,8 +44,9 @@ message RegisterApplicationMasterRespons
   optional ResourceProto maximumCapability = 1;
   optional bytes client_to_am_token_master_key = 2;
   repeated ApplicationACLMapProto application_ACLs = 3;
-  repeated ContainerProto containers_from_previous_attempt = 4;
+  repeated ContainerProto containers_from_previous_attempts = 4;
   optional string queue = 5;
+  repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
 }
 
 message FinishApplicationMasterRequestProto {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Wed Feb 26 20:20:39 2014
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -542,7 +543,7 @@ public class ApplicationMaster {
     }
 
     List<Container> previousAMRunningContainers =
-        response.getContainersFromPreviousAttempt();
+        response.getContainersFromPreviousAttempts();
     LOG.info("Received " + previousAMRunningContainers.size()
         + " previous AM's running containers on AM registration.");
     numAllocatedContainers.addAndGet(previousAMRunningContainers.size());

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java Wed Feb 26 20:20:39 2014
@@ -195,6 +195,12 @@ public class AMRMClientImpl<T extends Co
           appTrackingUrl);
     RegisterApplicationMasterResponse response =
         rmClient.registerApplicationMaster(request);
+
+    synchronized (this) {
+      if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
+        populateNMTokens(response.getNMTokensFromPreviousAttempts());
+      }
+    }
     return response;
   }
 
@@ -250,7 +256,7 @@ public class AMRMClientImpl<T extends Co
         lastResponseId = allocateResponse.getResponseId();
         clusterAvailableResources = allocateResponse.getAvailableResources();
         if (!allocateResponse.getNMTokens().isEmpty()) {
-          populateNMTokens(allocateResponse);
+          populateNMTokens(allocateResponse.getNMTokens());
         }
       }
     } finally {
@@ -284,13 +290,17 @@ public class AMRMClientImpl<T extends Co
 
   @Private
   @VisibleForTesting
-  protected void populateNMTokens(AllocateResponse allocateResponse) {
-    for (NMToken token : allocateResponse.getNMTokens()) {
+  protected void populateNMTokens(List<NMToken> nmTokens) {
+    for (NMToken token : nmTokens) {
       String nodeId = token.getNodeId().toString();
       if (getNMTokenCache().containsToken(nodeId)) {
-        LOG.debug("Replacing token for : " + nodeId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Replacing token for : " + nodeId);
+        }
       } else {
-        LOG.debug("Received new token for : " + nodeId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received new token for : " + nodeId);
+        }
       }
       getNMTokenCache().setToken(nodeId, token.getToken());
     }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java Wed Feb 26 20:20:39 2014
@@ -31,13 +31,16 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
 
@@ -56,7 +59,8 @@ public class RegisterApplicationMasterRe
 
   private Resource maximumResourceCapability;
   private Map<ApplicationAccessType, String> applicationACLS = null;
-  private List<Container> containersFromPreviousAttempt = null;
+  private List<Container> containersFromPreviousAttempts = null;
+  private List<NMToken> nmTokens = null;
 
   public RegisterApplicationMasterResponsePBImpl() {
     builder = RegisterApplicationMasterResponseProto.newBuilder();
@@ -110,8 +114,13 @@ public class RegisterApplicationMasterRe
     if (this.applicationACLS != null) {
       addApplicationACLs();
     }
-    if (this.containersFromPreviousAttempt != null) {
-      addRunningContainersToProto();
+    if (this.containersFromPreviousAttempts != null) {
+      addContainersFromPreviousAttemptToProto();
+    }
+    if (nmTokens != null) {
+      builder.clearNmTokensFromPreviousAttempts();
+      Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
+      builder.addAllNmTokensFromPreviousAttempts(iterable);
     }
   }
 
@@ -236,21 +245,22 @@ public class RegisterApplicationMasterRe
   }
 
   @Override
-  public List<Container> getContainersFromPreviousAttempt() {
-    if (this.containersFromPreviousAttempt != null) {
-      return this.containersFromPreviousAttempt;
+  public List<Container> getContainersFromPreviousAttempts() {
+    if (this.containersFromPreviousAttempts != null) {
+      return this.containersFromPreviousAttempts;
     }
-    initRunningContainersList();
-    return this.containersFromPreviousAttempt;
+    initContainersPreviousAttemptList();
+    return this.containersFromPreviousAttempts;
   }
 
   @Override
-  public void setContainersFromPreviousAttempt(final List<Container> containers) {
+  public void
+      setContainersFromPreviousAttempts(final List<Container> containers) {
     if (containers == null) {
       return;
     }
-    this.containersFromPreviousAttempt = new ArrayList<Container>();
-    this.containersFromPreviousAttempt.addAll(containers);
+    this.containersFromPreviousAttempts = new ArrayList<Container>();
+    this.containersFromPreviousAttempts.addAll(containers);
   }
   
   @Override
@@ -272,25 +282,88 @@ public class RegisterApplicationMasterRe
     }
   }
 
-  private void initRunningContainersList() {
-    RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<ContainerProto> list = p.getContainersFromPreviousAttemptList();
-    containersFromPreviousAttempt = new ArrayList<Container>();
+
+  private void initContainersPreviousAttemptList() {
+    RegisterApplicationMasterResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    List<ContainerProto> list = p.getContainersFromPreviousAttemptsList();
+    containersFromPreviousAttempts = new ArrayList<Container>();
     for (ContainerProto c : list) {
-      containersFromPreviousAttempt.add(convertFromProtoFormat(c));
+      containersFromPreviousAttempts.add(convertFromProtoFormat(c));
     }
   }
 
-  private void addRunningContainersToProto() {
+  private void addContainersFromPreviousAttemptToProto() {
     maybeInitBuilder();
-    builder.clearContainersFromPreviousAttempt();
+    builder.clearContainersFromPreviousAttempts();
     List<ContainerProto> list = new ArrayList<ContainerProto>();
-    for (Container c : containersFromPreviousAttempt) {
+    for (Container c : containersFromPreviousAttempts) {
       list.add(convertToProtoFormat(c));
     }
-    builder.addAllContainersFromPreviousAttempt(list);
+    builder.addAllContainersFromPreviousAttempts(list);
+  }
+
+
+  @Override
+  public List<NMToken> getNMTokensFromPreviousAttempts() {
+    if (nmTokens != null) {
+      return nmTokens;
+    }
+    initLocalNewNMTokenList();
+    return nmTokens;
   }
   
+  @Override
+  public void setNMTokensFromPreviousAttempts(final List<NMToken> nmTokens) {
+    if (nmTokens == null || nmTokens.isEmpty()) {
+      if (this.nmTokens != null) {
+        this.nmTokens.clear();
+      }
+      builder.clearNmTokensFromPreviousAttempts();
+      return;
+    }
+    this.nmTokens = new ArrayList<NMToken>();
+    this.nmTokens.addAll(nmTokens);
+  }
+
+  private synchronized void initLocalNewNMTokenList() {
+    RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<NMTokenProto> list = p.getNmTokensFromPreviousAttemptsList();
+    nmTokens = new ArrayList<NMToken>();
+    for (NMTokenProto t : list) {
+      nmTokens.add(convertFromProtoFormat(t));
+    }
+  }
+
+  private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
+      final List<NMToken> nmTokenList) {
+    maybeInitBuilder();
+    return new Iterable<NMTokenProto>() {
+      @Override
+      public synchronized Iterator<NMTokenProto> iterator() {
+        return new Iterator<NMTokenProto>() {
+
+          Iterator<NMToken> iter = nmTokenList.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public NMTokenProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+
   private Resource convertFromProtoFormat(ResourceProto resource) {
     return new ResourcePBImpl(resource);
   }
@@ -306,4 +379,12 @@ public class RegisterApplicationMasterRe
   private ContainerProto convertToProtoFormat(Container t) {
     return ((ContainerPBImpl) t).getProto();
   }
+
+  private NMTokenProto convertToProtoFormat(NMToken token) {
+    return ((NMTokenPBImpl) token).getProto();
+  }
+
+  private NMToken convertFromProtoFormat(NMTokenProto proto) {
+    return new NMTokenPBImpl(proto);
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java Wed Feb 26 20:20:39 2014
@@ -47,7 +47,7 @@ public class NMTokenPBImpl extends NMTok
     this.proto = proto;
     viaProto = true;
   }
-  
+
   @Override
   public synchronized NodeId getNodeId() {
     NMTokenProtoOrBuilder p = viaProto ? proto : builder;

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Feb 26 20:20:39 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionContainer;
 import org.apache.hadoop.yarn.api.records.PreemptionContract;
@@ -280,10 +282,32 @@ public class ApplicationMasterService ex
             .getMasterKey(applicationAttemptId).getEncoded()));        
       }
 
-      List<Container> containerList =
+      // For work-preserving AM restart, retrieve previous attempts' containers
+      // and corresponding NM tokens.
+      List<Container> transferredContainers =
           ((AbstractYarnScheduler) rScheduler)
             .getTransferredContainers(applicationAttemptId);
-      response.setContainersFromPreviousAttempt(containerList);
+      if (!transferredContainers.isEmpty()) {
+        response.setContainersFromPreviousAttempts(transferredContainers);
+        List<NMToken> nmTokens = new ArrayList<NMToken>();
+        for (Container container : transferredContainers) {
+          try {
+            nmTokens.add(rmContext.getNMTokenSecretManager()
+              .createAndGetNMToken(app.getUser(), applicationAttemptId,
+                container));
+          } catch (IllegalArgumentException e) {
+            // if it's a DNS issue, throw UnknowHostException directly and that
+            // will be automatically retried by RMProxy in RPC layer.
+            if (e.getCause() instanceof UnknownHostException) {
+              throw (UnknownHostException) e.getCause();
+            }
+          }
+        }
+        response.setNMTokensFromPreviousAttempts(nmTokens);
+        LOG.info("Application " + appID + " retrieved "
+            + transferredContainers.size() + " containers from previous"
+            + " attempts and " + nmTokens.size() + " NM tokens.");
+      }
       return response;
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Wed Feb 26 20:20:39 2014
@@ -385,9 +385,8 @@ public class SchedulerApplicationAttempt
         }
       } catch (IllegalArgumentException e) {
         // DNS might be down, skip returning this container.
-        LOG.error(
-          "Error trying to assign container token to allocated container "
-              + container.getId(), e);
+        LOG.error("Error trying to assign container token and NM token to" +
+            " an allocated container " + container.getId(), e);
         continue;
       }
       returnContainerList.add(container);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Feb 26 20:20:39 2014
@@ -486,6 +486,7 @@ public class MockRM extends ResourceMana
 
   public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
       throws Exception {
+    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
     RMAppAttempt attempt = app.getCurrentAppAttempt();
     nm.nodeHeartbeat(true);
     MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1572232&r1=1572231&r2=1572232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Wed Feb 26 20:20:39 2014
@@ -24,6 +24,7 @@ import java.util.List;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -160,11 +162,11 @@ public class TestAMRestart {
         am2.registerAppAttempt();
 
     // Assert two containers are running: container2 and container3;
-    Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt()
+    Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempts()
       .size());
     boolean containerId2Exists = false, containerId3Exists = false;
     for (Container container : registerResponse
-      .getContainersFromPreviousAttempt()) {
+      .getContainersFromPreviousAttempts()) {
       if (container.getId().equals(containerId2)) {
         containerId2Exists = true;
       }
@@ -232,4 +234,100 @@ public class TestAMRestart {
 
     rm1.stop();
   }
+
+  @Test
+  public void testNMTokensRebindOnAMRestart() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    RMApp app1 =
+        rm1.submitApp(200, "myname", "myuser",
+          new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+          null, "MAPREDUCE", false, true);
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    MockNM nm2 =
+        new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService());
+    nm2.registerNode();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    int NUM_CONTAINERS = 1;
+    List<Container> containers = new ArrayList<Container>();
+    // nmTokens keeps track of all the nmTokens issued in the allocate call.
+    List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
+
+    // am1 allocate 1 container on nm1.
+    while (true) {
+      AllocateResponse response =
+          am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
+            new ArrayList<ContainerId>());
+      nm1.nodeHeartbeat(true);
+      containers.addAll(response.getAllocatedContainers());
+      expectedNMTokens.addAll(response.getNMTokens());
+      if (containers.size() == NUM_CONTAINERS) {
+        break;
+      }
+      Thread.sleep(200);
+      System.out.println("Waiting for container to be allocated.");
+    }
+    // launch the container
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+    ContainerId containerId2 =
+        ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+
+    // fail am1
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am1.waitForState(RMAppAttemptState.FAILED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+    // restart the am
+    MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
+    RegisterApplicationMasterResponse registerResponse =
+        am2.registerAppAttempt();
+    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    // check am2 get the nm token from am1.
+    Assert.assertEquals(expectedNMTokens,
+      registerResponse.getNMTokensFromPreviousAttempts());
+
+    // am2 allocate 1 container on nm2
+    containers = new ArrayList<Container>();
+    while (true) {
+      AllocateResponse allocateResponse =
+          am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS,
+            new ArrayList<ContainerId>());
+      nm2.nodeHeartbeat(true);
+      containers.addAll(allocateResponse.getAllocatedContainers());
+      expectedNMTokens.addAll(allocateResponse.getNMTokens());
+      if (containers.size() == NUM_CONTAINERS) {
+        break;
+      }
+      Thread.sleep(200);
+      System.out.println("Waiting for container to be allocated.");
+    }
+    nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+    ContainerId am2ContainerId2 =
+        ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, am2ContainerId2, RMContainerState.RUNNING);
+
+    // fail am2.
+    nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am2.waitForState(RMAppAttemptState.FAILED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+    // restart am
+    MockAM am3 = MockRM.launchAM(app1, rm1, nm1);
+    registerResponse = am3.registerAppAttempt();
+    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    // check am3 get the NM token from both am1 and am2;
+    List<NMToken> transferredTokens = registerResponse.getNMTokensFromPreviousAttempts();
+    Assert.assertEquals(2, transferredTokens.size());
+    Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
+    rm1.stop();
+  }
 }