You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/05/18 03:01:06 UTC

[1/2] hadoop git commit: YARN-7900. [AMRMProxy] AMRMClientRelayer for stateful FederationInterceptor. (Botong Huang via asuresh)

Repository: hadoop
Updated Branches:
  refs/heads/trunk f749517cc -> 3159bffce


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index e518b90..38181e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@@ -78,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-7900. [AMRMProxy] AMRMClientRelayer for stateful FederationInterceptor. (Botong Huang via asuresh)

Posted by as...@apache.org.
YARN-7900. [AMRMProxy] AMRMClientRelayer for stateful FederationInterceptor. (Botong Huang via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3159bffc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3159bffc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3159bffc

Branch: refs/heads/trunk
Commit: 3159bffce23abf35754da2d7d51de7d8c2631ae3
Parents: f749517
Author: Arun Suresh <as...@apache.org>
Authored: Thu May 17 20:00:52 2018 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Thu May 17 20:00:52 2018 -0700

----------------------------------------------------------------------
 .../yarn/client/api/impl/AMRMClientImpl.java    | 151 ++------
 .../hadoop/yarn/client/AMRMClientUtils.java     | 262 +++++++++++++
 .../hadoop/yarn/server/AMRMClientRelayer.java   | 364 +++++++++++++++++++
 .../failover/FederationProxyProviderUtil.java   |   2 +-
 .../apache/hadoop/yarn/server/package-info.java |  18 +
 .../server/scheduler/ResourceRequestSet.java    | 206 +++++++++++
 .../server/scheduler/ResourceRequestSetKey.java | 133 +++++++
 .../server/scheduler/SchedulerRequestKey.java   |   4 +-
 .../yarn/server/uam/UnmanagedAMPoolManager.java |   2 +-
 .../server/uam/UnmanagedApplicationManager.java |   2 +-
 .../yarn/server/utils/AMRMClientUtils.java      | 191 ----------
 .../yarn/server/MockResourceManagerFacade.java  |   2 +-
 .../yarn/server/TestAMRMClientRelayer.java      | 275 ++++++++++++++
 .../amrmproxy/FederationInterceptor.java        |   2 +-
 .../ApplicationMasterService.java               |   2 +-
 .../TestApplicationMasterLauncher.java          |   2 +-
 16 files changed, 1299 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index ef849b2..36c3cf1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -31,11 +31,9 @@ import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.AbstractMap.SimpleEntry;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -68,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -113,13 +112,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected final Set<String> blacklistRemovals = new HashSet<String>();
   private Map<Set<String>, PlacementConstraint> placementConstraints =
       new HashMap<>();
-  private Queue<Collection<SchedulingRequest>> batchedSchedulingRequests =
-      new LinkedList<>();
-  private Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
-      new ConcurrentHashMap<>();
 
   protected Map<String, Resource> resourceProfilesMap;
-  
+
   static class ResourceRequestInfo<T> {
     ResourceRequest remoteRequest;
     LinkedHashSet<T> containerRequests;
@@ -168,6 +163,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       SimpleEntry<Container, UpdateContainerRequest>> pendingChange =
       new HashMap<>();
 
+  private List<SchedulingRequest> schedulingRequests = new ArrayList<>();
+  private Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
+      new HashMap<>();
+
   public AMRMClientImpl() {
     super(AMRMClientImpl.class.getName());
   }
@@ -252,18 +251,18 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       this.resourceProfilesMap = response.getResourceProfiles();
       List<Container> prevContainers =
           response.getContainersFromPreviousAttempts();
-      removeFromOutstandingSchedulingRequests(prevContainers);
-      recreateSchedulingRequestBatch();
+      AMRMClientUtils.removeFromOutstandingSchedulingRequests(prevContainers,
+          this.outstandingSchedRequests);
     }
     return response;
   }
 
   @Override
-  public void addSchedulingRequests(
-      Collection<SchedulingRequest> schedulingRequests) {
-    synchronized (this.batchedSchedulingRequests) {
-      this.batchedSchedulingRequests.add(schedulingRequests);
-    }
+  public synchronized void addSchedulingRequests(
+      Collection<SchedulingRequest> newSchedulingRequests) {
+    this.schedulingRequests.addAll(newSchedulingRequests);
+    AMRMClientUtils.addToOutstandingSchedulingRequests(newSchedulingRequests,
+        this.outstandingSchedRequests);
   }
 
   @Override
@@ -279,6 +278,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     List<String> blacklistToRemove = new ArrayList<String>();
     Map<ContainerId, SimpleEntry<Container, UpdateContainerRequest>> oldChange =
         new HashMap<>();
+    List<SchedulingRequest> schedulingRequestList = new LinkedList<>();
+
     try {
       synchronized (this) {
         askList = cloneAsks();
@@ -286,10 +287,13 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         oldChange.putAll(change);
         List<UpdateContainerRequest> updateList = createUpdateList();
         releaseList = new ArrayList<ContainerId>(release);
+        schedulingRequestList = new ArrayList<>(schedulingRequests);
+
         // optimistically clear this collection assuming no RPC failure
         ask.clear();
         release.clear();
         change.clear();
+        schedulingRequests.clear();
 
         blacklistToAdd.addAll(blacklistAdditions);
         blacklistToRemove.addAll(blacklistRemovals);
@@ -301,8 +305,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         allocateRequest = AllocateRequest.newBuilder()
             .responseId(lastResponseId).progress(progressIndicator)
             .askList(askList).resourceBlacklistRequest(blacklistRequest)
-            .releaseList(releaseList).updateRequests(updateList).build();
-        populateSchedulingRequests(allocateRequest);
+            .releaseList(releaseList).updateRequests(updateList)
+            .schedulingRequests(schedulingRequestList).build();
+
         // clear blacklistAdditions and blacklistRemovals before
         // unsynchronized part
         blacklistAdditions.clear();
@@ -311,10 +316,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
 
       try {
         allocateResponse = rmClient.allocate(allocateRequest);
-        removeFromOutstandingSchedulingRequests(
-            allocateResponse.getAllocatedContainers());
-        removeFromOutstandingSchedulingRequests(
-            allocateResponse.getContainersFromPreviousAttempts());
       } catch (ApplicationMasterNotRegisteredException e) {
         LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
             + " hence resyncing.");
@@ -331,6 +332,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
             }
           }
           change.putAll(this.pendingChange);
+          for (List<SchedulingRequest> schedReqs :
+              this.outstandingSchedRequests.values()) {
+            this.schedulingRequests.addAll(schedReqs);
+          }
         }
         // re register with RM
         registerApplicationMaster();
@@ -370,6 +375,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
             removePendingChangeRequests(changed);
           }
         }
+        AMRMClientUtils.removeFromOutstandingSchedulingRequests(
+            allocateResponse.getAllocatedContainers(),
+            this.outstandingSchedRequests);
+        AMRMClientUtils.removeFromOutstandingSchedulingRequests(
+            allocateResponse.getContainersFromPreviousAttempts(),
+            this.outstandingSchedRequests);
       }
     } finally {
       // TODO how to differentiate remote yarn exception vs error in rpc
@@ -410,108 +421,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           }
           blacklistAdditions.addAll(blacklistToAdd);
           blacklistRemovals.addAll(blacklistToRemove);
-        }
-      }
-    }
-    return allocateResponse;
-  }
 
-  private void populateSchedulingRequests(AllocateRequest allocateRequest) {
-    synchronized (this.batchedSchedulingRequests) {
-      if (!this.batchedSchedulingRequests.isEmpty()) {
-        List<SchedulingRequest> newReqs = new LinkedList<>();
-        Iterator<Collection<SchedulingRequest>> iter =
-            this.batchedSchedulingRequests.iterator();
-        while (iter.hasNext()) {
-          Collection<SchedulingRequest> requests = iter.next();
-          newReqs.addAll(requests);
-          addToOutstandingSchedulingRequests(requests);
-          iter.remove();
-        }
-        allocateRequest.setSchedulingRequests(newReqs);
-      }
-    }
-  }
-
-  private void recreateSchedulingRequestBatch() {
-    List<SchedulingRequest> batched = new ArrayList<>();
-    synchronized (this.outstandingSchedRequests) {
-      for (List<SchedulingRequest> schedReqs :
-          this.outstandingSchedRequests.values()) {
-        batched.addAll(schedReqs);
-      }
-    }
-    synchronized (this.batchedSchedulingRequests) {
-      this.batchedSchedulingRequests.add(batched);
-    }
-  }
-
-  private void addToOutstandingSchedulingRequests(
-      Collection<SchedulingRequest> requests) {
-    for (SchedulingRequest req : requests) {
-      List<SchedulingRequest> schedulingRequests =
-          this.outstandingSchedRequests.computeIfAbsent(
-              req.getAllocationTags(), x -> new LinkedList<>());
-      SchedulingRequest matchingReq = null;
-      synchronized (schedulingRequests) {
-        for (SchedulingRequest schedReq : schedulingRequests) {
-          if (isMatching(req, schedReq)) {
-            matchingReq = schedReq;
-            break;
-          }
-        }
-        if (matchingReq != null) {
-          matchingReq.getResourceSizing().setNumAllocations(
-              req.getResourceSizing().getNumAllocations());
-        } else {
-          schedulingRequests.add(req);
-        }
-      }
-    }
-  }
-
-  private boolean isMatching(SchedulingRequest schedReq1,
-      SchedulingRequest schedReq2) {
-    return schedReq1.getPriority().equals(schedReq2.getPriority()) &&
-        schedReq1.getExecutionType().getExecutionType().equals(
-            schedReq1.getExecutionType().getExecutionType()) &&
-        schedReq1.getAllocationRequestId() ==
-            schedReq2.getAllocationRequestId();
-  }
-
-  private void removeFromOutstandingSchedulingRequests(
-      Collection<Container> containers) {
-    if (containers == null || containers.isEmpty()) {
-      return;
-    }
-    for (Container container : containers) {
-      if (container.getAllocationTags() != null &&
-          !container.getAllocationTags().isEmpty()) {
-        List<SchedulingRequest> schedReqs =
-            this.outstandingSchedRequests.get(container.getAllocationTags());
-        if (schedReqs != null && !schedReqs.isEmpty()) {
-          synchronized (schedReqs) {
-            Iterator<SchedulingRequest> iter = schedReqs.iterator();
-            while (iter.hasNext()) {
-              SchedulingRequest schedReq = iter.next();
-              if (schedReq.getPriority().equals(container.getPriority()) &&
-                  schedReq.getAllocationRequestId() ==
-                      container.getAllocationRequestId()) {
-                int numAllocations =
-                    schedReq.getResourceSizing().getNumAllocations();
-                numAllocations--;
-                if (numAllocations == 0) {
-                  iter.remove();
-                } else {
-                  schedReq.getResourceSizing()
-                      .setNumAllocations(numAllocations);
-                }
-              }
-            }
-          }
+          schedulingRequests.addAll(schedulingRequestList);
         }
       }
     }
+    return allocateResponse;
   }
 
   private List<UpdateContainerRequest> createUpdateList() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
new file mode 100644
index 0000000..387e399
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for AMRMClient.
+ */
+@Private
+public final class AMRMClientUtils {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AMRMClientUtils.class);
+
+  public static final String APP_ALREADY_REGISTERED_MESSAGE =
+      "Application Master is already registered : ";
+
+  private AMRMClientUtils() {
+  }
+
+  /**
+   * Handle ApplicationNotRegistered exception and re-register.
+   *
+   * @param appId application Id
+   * @param rmProxy RM proxy instance
+   * @param registerRequest the AM re-register request
+   * @throws YarnException if re-register fails
+   */
+  public static void handleNotRegisteredExceptionAndReRegister(
+      ApplicationId appId, ApplicationMasterProtocol rmProxy,
+      RegisterApplicationMasterRequest registerRequest) throws YarnException {
+    LOG.info("App attempt {} not registered, most likely due to RM failover. "
+        + " Trying to re-register.", appId);
+    try {
+      rmProxy.registerApplicationMaster(registerRequest);
+    } catch (Exception e) {
+      if (e instanceof InvalidApplicationMasterRequestException
+          && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
+        LOG.info("Concurrent thread successfully registered, moving on.");
+      } else {
+        LOG.error("Error trying to re-register AM", e);
+        throw new YarnException(e);
+      }
+    }
+  }
+
+  /**
+   * Helper method for client calling ApplicationMasterProtocol.allocate that
+   * handles re-register if RM fails over.
+   *
+   * @param request allocate request
+   * @param rmProxy RM proxy
+   * @param registerRequest the register request for re-register
+   * @param appId application id
+   * @return allocate response
+   * @throws YarnException if RM call fails
+   * @throws IOException if RM call fails
+   */
+  public static AllocateResponse allocateWithReRegister(AllocateRequest request,
+      ApplicationMasterProtocol rmProxy,
+      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
+      throws YarnException, IOException {
+    try {
+      return rmProxy.allocate(request);
+    } catch (ApplicationMasterNotRegisteredException e) {
+      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
+          registerRequest);
+      // reset responseId after re-register
+      request.setResponseId(0);
+      // retry allocate
+      return allocateWithReRegister(request, rmProxy, registerRequest, appId);
+    }
+  }
+
+  /**
+   * Helper method for client calling
+   * ApplicationMasterProtocol.finishApplicationMaster that handles re-register
+   * if RM fails over.
+   *
+   * @param request finishApplicationMaster request
+   * @param rmProxy RM proxy
+   * @param registerRequest the register request for re-register
+   * @param appId application id
+   * @return finishApplicationMaster response
+   * @throws YarnException if RM call fails
+   * @throws IOException if RM call fails
+   */
+  public static FinishApplicationMasterResponse finishAMWithReRegister(
+      FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
+      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
+      throws YarnException, IOException {
+    try {
+      return rmProxy.finishApplicationMaster(request);
+    } catch (ApplicationMasterNotRegisteredException ex) {
+      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
+          registerRequest);
+      // retry finishAM after re-register
+      return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
+    }
+  }
+
+  /**
+   * Create a proxy for the specified protocol.
+   *
+   * @param configuration Configuration to generate {@link ClientRMProxy}
+   * @param protocol Protocol for the proxy
+   * @param user the user on whose behalf the proxy is being created
+   * @param token the auth token to use for connection
+   * @param <T> Type information of the proxy
+   * @return Proxy to the RM
+   * @throws IOException on failure
+   */
+  @Public
+  @Unstable
+  public static <T> T createRMProxy(final Configuration configuration,
+      final Class<T> protocol, UserGroupInformation user,
+      final Token<? extends TokenIdentifier> token) throws IOException {
+    try {
+      String rmClusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID,
+          YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+      LOG.info("Creating RMProxy to RM {} for protocol {} for user {}",
+          rmClusterId, protocol.getSimpleName(), user);
+      if (token != null) {
+        // preserve the token service sent by the RM when adding the token
+        // to ensure we replace the previous token setup by the RM.
+        // Afterwards we can update the service address for the RPC layer.
+        // Same as YarnServerSecurityUtils.updateAMRMToken()
+        user.addToken(token);
+        token.setService(ClientRMProxy.getAMRMTokenService(configuration));
+        setAuthModeInConf(configuration);
+      }
+      final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
+        @Override
+        public T run() throws Exception {
+          return ClientRMProxy.createRMProxy(configuration, protocol);
+        }
+      });
+      return proxyConnection;
+
+    } catch (InterruptedException e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  private static void setAuthModeInConf(Configuration conf) {
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        SaslRpcServer.AuthMethod.TOKEN.toString());
+  }
+
+  public static void addToOutstandingSchedulingRequests(
+      Collection<SchedulingRequest> requests,
+      Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) {
+    for (SchedulingRequest req : requests) {
+      List<SchedulingRequest> schedulingRequests = outstandingSchedRequests
+          .computeIfAbsent(req.getAllocationTags(), x -> new LinkedList<>());
+      SchedulingRequest matchingReq = null;
+      for (SchedulingRequest schedReq : schedulingRequests) {
+        if (isMatchingSchedulingRequests(req, schedReq)) {
+          matchingReq = schedReq;
+          break;
+        }
+      }
+      if (matchingReq != null) {
+        matchingReq.getResourceSizing()
+            .setNumAllocations(req.getResourceSizing().getNumAllocations());
+      } else {
+        schedulingRequests.add(req);
+      }
+    }
+  }
+
+  public static boolean isMatchingSchedulingRequests(
+      SchedulingRequest schedReq1, SchedulingRequest schedReq2) {
+    return schedReq1.getPriority().equals(schedReq2.getPriority()) &&
+        schedReq1.getExecutionType().getExecutionType().equals(
+            schedReq1.getExecutionType().getExecutionType()) &&
+        schedReq1.getAllocationRequestId() ==
+            schedReq2.getAllocationRequestId();
+  }
+
+  public static void removeFromOutstandingSchedulingRequests(
+      Collection<Container> containers,
+      Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) {
+    if (containers == null || containers.isEmpty()) {
+      return;
+    }
+    for (Container container : containers) {
+      if (container.getAllocationTags() != null
+          && !container.getAllocationTags().isEmpty()) {
+        List<SchedulingRequest> schedReqs =
+            outstandingSchedRequests.get(container.getAllocationTags());
+        if (schedReqs != null && !schedReqs.isEmpty()) {
+          Iterator<SchedulingRequest> iter = schedReqs.iterator();
+          while (iter.hasNext()) {
+            SchedulingRequest schedReq = iter.next();
+            if (schedReq.getPriority().equals(container.getPriority())
+                && schedReq.getAllocationRequestId() == container
+                    .getAllocationRequestId()) {
+              int numAllocations =
+                  schedReq.getResourceSizing().getNumAllocations();
+              numAllocations--;
+              if (numAllocations == 0) {
+                iter.remove();
+              } else {
+                schedReq.getResourceSizing().setNumAllocations(numAllocations);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
new file mode 100644
index 0000000..c216ace
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
@@ -0,0 +1,364 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A component that sits in between AMRMClient(Impl) and Yarn RM. It remembers
+ * pending requests similar to AMRMClient, and handles RM re-sync automatically
+ * without propagate the re-sync exception back to AMRMClient.
+ */
+public class AMRMClientRelayer extends AbstractService
+    implements ApplicationMasterProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AMRMClientRelayer.class);
+
+  private ApplicationMasterProtocol rmClient;
+
+  /**
+   * The original registration request that was sent by the AM. This instance is
+   * reused to register/re-register with all the sub-cluster RMs.
+   */
+  private RegisterApplicationMasterRequest amRegistrationRequest;
+
+  /**
+   * Similar to AMRMClientImpl, all data structures below have two versions:
+   *
+   * The remote ones are all the pending requests that RM has not fulfill yet.
+   * Whenever RM fails over, we re-register and then full re-send all these
+   * pending requests.
+   *
+   * The non-remote ones are the requests that RM has not received yet. When RM
+   * throws non-fail-over exception back, the request is considered not received
+   * by RM. We will merge with new requests and re-send in the next heart beat.
+   */
+  private Map<ResourceRequestSetKey, ResourceRequestSet> remotePendingAsks =
+      new HashMap<>();
+  /**
+   * Same as AMRMClientImpl, we need to use a custom comparator that does not
+   * look at ResourceRequest.getNumContainers() here. TreeSet allows a custom
+   * comparator.
+   */
+  private Set<ResourceRequest> ask =
+      new TreeSet<>(new ResourceRequest.ResourceRequestComparator());
+
+  private Set<ContainerId> remotePendingRelease = new HashSet<>();
+  private Set<ContainerId> release = new HashSet<>();
+
+  private Set<String> remoteBlacklistedNodes = new HashSet<>();
+  private Set<String> blacklistAdditions = new HashSet<>();
+  private Set<String> blacklistRemovals = new HashSet<>();
+
+  private Map<ContainerId, UpdateContainerRequest> remotePendingChange =
+      new HashMap<>();
+  private Map<ContainerId, UpdateContainerRequest> change = new HashMap<>();
+
+  private Map<Set<String>, List<SchedulingRequest>> remotePendingSchedRequest =
+      new HashMap<>();
+  private List<SchedulingRequest> schedulingRequest = new ArrayList<>();
+
+  public AMRMClientRelayer() {
+    super(AMRMClientRelayer.class.getName());
+  }
+
+  public AMRMClientRelayer(ApplicationMasterProtocol rmClient) {
+    this();
+    this.rmClient = rmClient;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    final YarnConfiguration conf = new YarnConfiguration(getConfig());
+    try {
+      if (this.rmClient == null) {
+        this.rmClient =
+            ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
+      }
+    } catch (IOException e) {
+      throw new YarnRuntimeException(e);
+    }
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (this.rmClient != null) {
+      RPC.stopProxy(this.rmClient);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
+    this.amRegistrationRequest = request;
+    return this.rmClient.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request)
+      throws YarnException, IOException {
+    try {
+      return this.rmClient.finishApplicationMaster(request);
+    } catch (ApplicationMasterNotRegisteredException e) {
+      LOG.warn("Out of sync with ResourceManager, hence resyncing.");
+      // re register with RM
+      registerApplicationMaster(this.amRegistrationRequest);
+      return finishApplicationMaster(request);
+    }
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest allocateRequest)
+      throws YarnException, IOException {
+    AllocateResponse allocateResponse = null;
+    try {
+      synchronized (this) {
+        // update the data structures first
+        addNewAsks(allocateRequest.getAskList());
+
+        if (allocateRequest.getReleaseList() != null) {
+          this.remotePendingRelease.addAll(allocateRequest.getReleaseList());
+          this.release.addAll(allocateRequest.getReleaseList());
+        }
+
+        if (allocateRequest.getResourceBlacklistRequest() != null) {
+          if (allocateRequest.getResourceBlacklistRequest()
+              .getBlacklistAdditions() != null) {
+            this.remoteBlacklistedNodes.addAll(allocateRequest
+                .getResourceBlacklistRequest().getBlacklistAdditions());
+            this.blacklistAdditions.addAll(allocateRequest
+                .getResourceBlacklistRequest().getBlacklistAdditions());
+          }
+          if (allocateRequest.getResourceBlacklistRequest()
+              .getBlacklistRemovals() != null) {
+            this.remoteBlacklistedNodes.removeAll(allocateRequest
+                .getResourceBlacklistRequest().getBlacklistRemovals());
+            this.blacklistRemovals.addAll(allocateRequest
+                .getResourceBlacklistRequest().getBlacklistRemovals());
+          }
+        }
+
+        if (allocateRequest.getUpdateRequests() != null) {
+          for (UpdateContainerRequest update : allocateRequest
+              .getUpdateRequests()) {
+            this.remotePendingChange.put(update.getContainerId(), update);
+            this.change.put(update.getContainerId(), update);
+          }
+        }
+
+        if (allocateRequest.getSchedulingRequests() != null) {
+          AMRMClientUtils.addToOutstandingSchedulingRequests(
+              allocateRequest.getSchedulingRequests(),
+              this.remotePendingSchedRequest);
+          this.schedulingRequest
+              .addAll(allocateRequest.getSchedulingRequests());
+        }
+
+        ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
+        for (ResourceRequest r : ask) {
+          // create a copy of ResourceRequest as we might change it while the
+          // RPC layer is using it to send info across
+          askList.add(ResourceRequest.newBuilder().priority(r.getPriority())
+              .resourceName(r.getResourceName()).capability(r.getCapability())
+              .numContainers(r.getNumContainers())
+              .relaxLocality(r.getRelaxLocality())
+              .nodeLabelExpression(r.getNodeLabelExpression())
+              .executionTypeRequest(r.getExecutionTypeRequest())
+              .allocationRequestId(r.getAllocationRequestId()).build());
+        }
+
+        allocateRequest = AllocateRequest.newBuilder()
+            .responseId(allocateRequest.getResponseId())
+            .progress(allocateRequest.getProgress()).askList(askList)
+            .releaseList(new ArrayList<>(this.release))
+            .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
+                new ArrayList<>(this.blacklistAdditions),
+                new ArrayList<>(this.blacklistRemovals)))
+            .updateRequests(new ArrayList<>(this.change.values()))
+            .schedulingRequests(new ArrayList<>(this.schedulingRequest))
+            .build();
+      }
+
+      // Do the actual allocate call
+      try {
+        allocateResponse = this.rmClient.allocate(allocateRequest);
+      } catch (ApplicationMasterNotRegisteredException e) {
+        LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+            + " hence resyncing.");
+
+        synchronized (this) {
+          // Add all remotePending data into to-send data structures
+          for (ResourceRequestSet requestSet : this.remotePendingAsks
+              .values()) {
+            for (ResourceRequest rr : requestSet.getRRs()) {
+              addResourceRequestToAsk(rr);
+            }
+          }
+          this.release.addAll(this.remotePendingRelease);
+          this.blacklistAdditions.addAll(this.remoteBlacklistedNodes);
+          this.change.putAll(this.remotePendingChange);
+          for (List<SchedulingRequest> reqs : this.remotePendingSchedRequest
+              .values()) {
+            this.schedulingRequest.addAll(reqs);
+          }
+        }
+
+        // re register with RM, then retry allocate recursively
+        registerApplicationMaster(this.amRegistrationRequest);
+        return allocate(allocateRequest);
+      }
+
+      synchronized (this) {
+        // Process the allocate response from RM
+        if (allocateResponse.getCompletedContainersStatuses() != null) {
+          for (ContainerStatus container : allocateResponse
+              .getCompletedContainersStatuses()) {
+            this.remotePendingRelease.remove(container.getContainerId());
+            this.remotePendingChange.remove(container.getContainerId());
+          }
+        }
+
+        if (allocateResponse.getUpdatedContainers() != null) {
+          for (UpdatedContainer updatedContainer : allocateResponse
+              .getUpdatedContainers()) {
+            this.remotePendingChange
+                .remove(updatedContainer.getContainer().getId());
+          }
+        }
+
+        AMRMClientUtils.removeFromOutstandingSchedulingRequests(
+            allocateResponse.getAllocatedContainers(),
+            this.remotePendingSchedRequest);
+        AMRMClientUtils.removeFromOutstandingSchedulingRequests(
+            allocateResponse.getContainersFromPreviousAttempts(),
+            this.remotePendingSchedRequest);
+      }
+
+    } finally {
+      synchronized (this) {
+        /*
+         * If allocateResponse is null, it means exception happened and RM did
+         * not accept the request. Don't clear any data structures so that they
+         * will be re-sent next time.
+         *
+         * Otherwise request was accepted by RM, we are safe to clear these.
+         */
+        if (allocateResponse != null) {
+          this.ask.clear();
+          this.release.clear();
+
+          this.blacklistAdditions.clear();
+          this.blacklistRemovals.clear();
+
+          this.change.clear();
+          this.schedulingRequest.clear();
+        }
+      }
+    }
+    return allocateResponse;
+  }
+
+  private void addNewAsks(List<ResourceRequest> asks) throws YarnException {
+    Set<ResourceRequestSetKey> touchedKeys = new HashSet<>();
+    for (ResourceRequest rr : asks) {
+      addResourceRequestToAsk(rr);
+
+      ResourceRequestSetKey key = new ResourceRequestSetKey(rr);
+      touchedKeys.add(key);
+
+      ResourceRequestSet askSet = this.remotePendingAsks.get(key);
+      if (askSet == null) {
+        askSet = new ResourceRequestSet(key);
+        this.remotePendingAsks.put(key, askSet);
+      }
+      askSet.addAndOverrideRR(rr);
+    }
+
+    // Cleanup properly if needed
+    for (ResourceRequestSetKey key : touchedKeys) {
+      ResourceRequestSet askSet = this.remotePendingAsks.get(key);
+      if (askSet.getNumContainers() == 0) {
+        this.remotePendingAsks.remove(key);
+      } else {
+        // Remove non-any zero RRs
+        askSet.cleanupZeroNonAnyRR();
+      }
+    }
+  }
+
+  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+    // The ResourceRequestComparator doesn't look at container count when
+    // comparing. So we need to make sure the new RR override the old if any
+    this.ask.remove(remoteRequest);
+    this.ask.add(remoteRequest);
+  }
+
+  @VisibleForTesting
+  protected Map<ResourceRequestSetKey, ResourceRequestSet>
+      getRemotePendingAsks() {
+    return this.remotePendingAsks;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
index 3931f2b..91924da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
@@ -27,12 +27,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/package-info.java
new file mode 100644
index 0000000..6289500
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java
new file mode 100644
index 0000000..b1e6b6e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java
@@ -0,0 +1,206 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * A set of resource requests of the same scheduler key
+ * {@link ResourceRequestSetKey}.
+ */
+public class ResourceRequestSet {
+
+  private ResourceRequestSetKey key;
+  private int numContainers;
+  // ResourceName -> RR
+  private Map<String, ResourceRequest> asks;
+
+  /**
+   * Create a empty set with given key.
+   *
+   * @param key the key of the request set
+   * @throws YarnException if fails
+   */
+  public ResourceRequestSet(ResourceRequestSetKey key) throws YarnException {
+    this.key = key;
+    // leave it zero for now, as if it is a cancel
+    this.numContainers = 0;
+    this.asks = new HashMap<>();
+  }
+
+  /**
+   * Create a shallow copy of the request set.
+   *
+   * @param other the set of copy from
+   */
+  public ResourceRequestSet(ResourceRequestSet other) {
+    this.key = other.key;
+    this.numContainers = other.numContainers;
+    this.asks = new HashMap<>();
+    // The assumption is that the RR objects should not be modified without
+    // making a copy
+    this.asks.putAll(other.asks);
+  }
+
+  /**
+   * Add a {@link ResourceRequest} into the requestSet. If there's already an RR
+   * with the same resource name, override it and update accordingly.
+   *
+   * @param ask the new {@link ResourceRequest}
+   * @throws YarnException
+   */
+  public void addAndOverrideRR(ResourceRequest ask) throws YarnException {
+    if (!this.key.equals(new ResourceRequestSetKey(ask))) {
+      throw new YarnException(
+          "None compatible asks: \n" + ask + "\n" + this.key);
+    }
+
+    // Override directly if exists
+    this.asks.put(ask.getResourceName(), ask);
+
+    if (this.key.getExeType().equals(ExecutionType.GUARANTEED)) {
+      // For G requestSet, update the numContainers only for ANY RR
+      if (ask.getResourceName().equals(ResourceRequest.ANY)) {
+        this.numContainers = ask.getNumContainers();
+      }
+    } else {
+      // The assumption we made about O asks is that all RR in a requestSet has
+      // the same numContainers value. So we just take the value of the last RR
+      this.numContainers = ask.getNumContainers();
+    }
+    if (this.numContainers < 0) {
+      throw new YarnException("numContainers becomes " + this.numContainers
+          + " when adding ask " + ask + "\n requestSet: " + toString());
+    }
+  }
+
+  /**
+   * Merge a requestSet into this one.
+   *
+   * @param requestSet the requestSet to merge
+   * @throws YarnException
+   */
+  public void addAndOverrideRRSet(ResourceRequestSet requestSet)
+      throws YarnException {
+    if (requestSet == null) {
+      return;
+    }
+    for (ResourceRequest rr : requestSet.getRRs()) {
+      addAndOverrideRR(rr);
+    }
+  }
+
+  /**
+   * Remove all non-Any ResourceRequests from the set. This is necessary cleanup
+   * to avoid requestSet getting too big.
+   */
+  public void cleanupZeroNonAnyRR() {
+    Iterator<Entry<String, ResourceRequest>> iter =
+        this.asks.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry<String, ResourceRequest> entry = iter.next();
+      if (entry.getKey().equals(ResourceRequest.ANY)) {
+        // Do not delete ANY RR
+        continue;
+      }
+      if (entry.getValue().getNumContainers() == 0) {
+        iter.remove();
+      }
+    }
+  }
+
+  public Map<String, ResourceRequest> getAsks() {
+    return this.asks;
+  }
+
+  public Collection<ResourceRequest> getRRs() {
+    return this.asks.values();
+  }
+
+  public int getNumContainers() {
+    return this.numContainers;
+  }
+
+  /**
+   * Force set the # of containers to ask for this requestSet to a given value.
+   *
+   * @param newValue the new # of containers value
+   * @throws YarnException
+   */
+  public void setNumContainers(int newValue) throws YarnException {
+    if (this.numContainers == 0) {
+      throw new YarnException("should not set numContainers to " + newValue
+          + " for a cancel requestSet: " + toString());
+    }
+
+    // Clone the ResourceRequest object whenever we need to change it
+    int oldValue = this.numContainers;
+    this.numContainers = newValue;
+    if (this.key.getExeType().equals(ExecutionType.OPPORTUNISTIC)) {
+      // The assumption we made about O asks is that all RR in a requestSet has
+      // the same numContainers value
+      Map<String, ResourceRequest> newAsks = new HashMap<>();
+      for (ResourceRequest rr : this.asks.values()) {
+        ResourceRequest clone = cloneResourceRequest(rr);
+        clone.setNumContainers(newValue);
+        newAsks.put(clone.getResourceName(), clone);
+      }
+      this.asks = newAsks;
+    } else {
+      ResourceRequest rr = this.asks.get(ResourceRequest.ANY);
+      if (rr == null) {
+        throw new YarnException(
+            "No ANY RR found in requestSet with numContainers=" + oldValue);
+      }
+      ResourceRequest clone = cloneResourceRequest(rr);
+      clone.setNumContainers(newValue);
+      this.asks.put(ResourceRequest.ANY, clone);
+    }
+  }
+
+  private ResourceRequest cloneResourceRequest(ResourceRequest rr) {
+    return ResourceRequest.newBuilder().priority(rr.getPriority())
+        .resourceName(rr.getResourceName()).capability(rr.getCapability())
+        .numContainers(rr.getNumContainers())
+        .relaxLocality(rr.getRelaxLocality())
+        .nodeLabelExpression(rr.getNodeLabelExpression())
+        .executionTypeRequest(rr.getExecutionTypeRequest())
+        .allocationRequestId(rr.getAllocationRequestId()).build();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("{" + this.key.toString());
+    for (Entry<String, ResourceRequest> entry : this.asks.entrySet()) {
+      builder.append(
+          " " + entry.getValue().getNumContainers() + ":" + entry.getKey());
+    }
+    builder.append("}");
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java
new file mode 100644
index 0000000..4db88ef
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java
@@ -0,0 +1,133 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * The scheduler key for a group of {@link ResourceRequest}.
+ *
+ * TODO: after YARN-7631 is fixed by adding Resource and ExecType into
+ * SchedulerRequestKey, then we can directly use that.
+ */
+public class ResourceRequestSetKey extends SchedulerRequestKey {
+
+  // More ResourceRequest key fields on top of SchedulerRequestKey
+  private final Resource resource;
+  private final ExecutionType execType;
+
+  /**
+   * Create the key object from a {@link ResourceRequest}.
+   *
+   * @param rr Resource request object
+   * @throws YarnException if fails
+   */
+  public ResourceRequestSetKey(ResourceRequest rr) throws YarnException {
+    this(rr.getAllocationRequestId(), rr.getPriority(), rr.getCapability(),
+        ((rr.getExecutionTypeRequest() == null) ? ExecutionType.GUARANTEED
+            : rr.getExecutionTypeRequest().getExecutionType()));
+    if (rr.getPriority() == null) {
+      throw new YarnException("Null priority in RR: " + rr);
+    }
+    if (rr.getCapability() == null) {
+      throw new YarnException("Null resource in RR: " + rr);
+    }
+  }
+
+  /**
+   * Create the key object from member objects.
+   *
+   * @param allocationRequestId allocate request id of the ask
+   * @param priority the priority of the ask
+   * @param resource the resource size of the ask
+   * @param execType the execution type of the ask
+   */
+  public ResourceRequestSetKey(long allocationRequestId, Priority priority,
+      Resource resource, ExecutionType execType) {
+    super(priority, allocationRequestId, null);
+
+    if (resource == null) {
+      this.resource = Resource.newInstance(0, 0);
+    } else {
+      this.resource = resource;
+    }
+    if (execType == null) {
+      this.execType = ExecutionType.GUARANTEED;
+    } else {
+      this.execType = execType;
+    }
+  }
+
+  public Resource getResource() {
+    return this.resource;
+  }
+
+  public ExecutionType getExeType() {
+    return this.execType;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof SchedulerRequestKey)) {
+      return false;
+    }
+    if (!(obj instanceof ResourceRequestSetKey)) {
+      return super.equals(obj);
+    }
+    ResourceRequestSetKey other = (ResourceRequestSetKey) obj;
+    return super.equals(other) && this.resource.equals(other.resource)
+        && this.execType.equals(other.execType);
+  }
+
+  @Override
+  public int hashCode() {
+    return ((super.hashCode() * 37 + this.resource.hashCode()) * 41)
+        + this.execType.hashCode();
+  }
+
+  @Override
+  public int compareTo(SchedulerRequestKey other) {
+    int ret = super.compareTo(other);
+    if (ret != 0) {
+      return ret;
+    }
+    if (!(other instanceof ResourceRequestSetKey)) {
+      return ret;
+    }
+
+    ResourceRequestSetKey otherKey = (ResourceRequestSetKey) other;
+    ret = this.resource.compareTo(otherKey.resource);
+    if (ret != 0) {
+      return ret;
+    }
+    return this.execType.compareTo(otherKey.execType);
+  }
+
+  @Override
+  public String toString() {
+    return "[id:" + getAllocationRequestId() + " p:"
+        + getPriority().getPriority()
+        + (this.execType.equals(ExecutionType.GUARANTEED) ? " G"
+            : " O" + " r:" + this.resource + "]");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
index 0fce083..c3b08d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
  * Composite key for outstanding scheduler requests for any schedulable entity.
  * Currently it includes {@link Priority}.
  */
-public final class SchedulerRequestKey implements
+public class SchedulerRequestKey implements
     Comparable<SchedulerRequestKey> {
 
   private final Priority priority;
@@ -73,8 +73,6 @@ public final class SchedulerRequestKey implements
         container.getAllocationRequestId(), null);
   }
 
-
-
   public SchedulerRequestKey(Priority priority, long allocationRequestId,
       ContainerId containerToUpdate) {
     this.priority = priority;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 677c4e6..02eef29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -47,9 +47,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 3f4a110..10985e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -56,13 +56,13 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
deleted file mode 100644
index 37e2b5e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.utils;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class for AMRMClient.
- */
-@Private
-public final class AMRMClientUtils {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(AMRMClientUtils.class);
-
-  public static final String APP_ALREADY_REGISTERED_MESSAGE =
-      "Application Master is already registered : ";
-
-  private AMRMClientUtils() {
-  }
-
-  /**
-   * Handle ApplicationNotRegistered exception and re-register.
-   *
-   * @param appId application Id
-   * @param rmProxy RM proxy instance
-   * @param registerRequest the AM re-register request
-   * @throws YarnException if re-register fails
-   */
-  public static void handleNotRegisteredExceptionAndReRegister(
-      ApplicationId appId, ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest) throws YarnException {
-    LOG.info("App attempt {} not registered, most likely due to RM failover. "
-        + " Trying to re-register.", appId);
-    try {
-      rmProxy.registerApplicationMaster(registerRequest);
-    } catch (Exception e) {
-      if (e instanceof InvalidApplicationMasterRequestException
-          && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
-        LOG.info("Concurrent thread successfully registered, moving on.");
-      } else {
-        LOG.error("Error trying to re-register AM", e);
-        throw new YarnException(e);
-      }
-    }
-  }
-
-  /**
-   * Helper method for client calling ApplicationMasterProtocol.allocate that
-   * handles re-register if RM fails over.
-   *
-   * @param request allocate request
-   * @param rmProxy RM proxy
-   * @param registerRequest the register request for re-register
-   * @param appId application id
-   * @return allocate response
-   * @throws YarnException if RM call fails
-   * @throws IOException if RM call fails
-   */
-  public static AllocateResponse allocateWithReRegister(AllocateRequest request,
-      ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
-      throws YarnException, IOException {
-    try {
-      return rmProxy.allocate(request);
-    } catch (ApplicationMasterNotRegisteredException e) {
-      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
-          registerRequest);
-      // reset responseId after re-register
-      request.setResponseId(0);
-      // retry allocate
-      return allocateWithReRegister(request, rmProxy, registerRequest, appId);
-    }
-  }
-
-  /**
-   * Helper method for client calling
-   * ApplicationMasterProtocol.finishApplicationMaster that handles re-register
-   * if RM fails over.
-   *
-   * @param request finishApplicationMaster request
-   * @param rmProxy RM proxy
-   * @param registerRequest the register request for re-register
-   * @param appId application id
-   * @return finishApplicationMaster response
-   * @throws YarnException if RM call fails
-   * @throws IOException if RM call fails
-   */
-  public static FinishApplicationMasterResponse finishAMWithReRegister(
-      FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
-      throws YarnException, IOException {
-    try {
-      return rmProxy.finishApplicationMaster(request);
-    } catch (ApplicationMasterNotRegisteredException ex) {
-      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
-          registerRequest);
-      // retry finishAM after re-register
-      return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
-    }
-  }
-
-  /**
-   * Create a proxy for the specified protocol.
-   *
-   * @param configuration Configuration to generate {@link ClientRMProxy}
-   * @param protocol Protocol for the proxy
-   * @param user the user on whose behalf the proxy is being created
-   * @param token the auth token to use for connection
-   * @param <T> Type information of the proxy
-   * @return Proxy to the RM
-   * @throws IOException on failure
-   */
-  @Public
-  @Unstable
-  public static <T> T createRMProxy(final Configuration configuration,
-      final Class<T> protocol, UserGroupInformation user,
-      final Token<? extends TokenIdentifier> token) throws IOException {
-    try {
-      String rmClusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID,
-          YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
-      LOG.info("Creating RMProxy to RM {} for protocol {} for user {}",
-          rmClusterId, protocol.getSimpleName(), user);
-      if (token != null) {
-        // preserve the token service sent by the RM when adding the token
-        // to ensure we replace the previous token setup by the RM.
-        // Afterwards we can update the service address for the RPC layer.
-        // Same as YarnServerSecurityUtils.updateAMRMToken()
-        user.addToken(token);
-        token.setService(ClientRMProxy.getAMRMTokenService(configuration));
-        setAuthModeInConf(configuration);
-      }
-      final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
-        @Override
-        public T run() throws Exception {
-          return ClientRMProxy.createRMProxy(configuration, protocol);
-        }
-      });
-      return proxyConnection;
-
-    } catch (InterruptedException e) {
-      throw new YarnRuntimeException(e);
-    }
-  }
-
-  private static void setAuthModeInConf(Configuration conf) {
-    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-        SaslRpcServer.AuthMethod.TOKEN.toString());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 15e1cea..23cd3e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -126,6 +126,7 @@ import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
@@ -158,7 +159,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
-import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
new file mode 100644
index 0000000..22bb1f9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
@@ -0,0 +1,275 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for AMRMClientRelayer.
+ */
+public class TestAMRMClientRelayer {
+
+  /**
+   * Mocked ApplicationMasterService in RM.
+   */
+  public static class MockApplicationMasterService
+      implements ApplicationMasterProtocol {
+
+    // Whether this mockRM will throw failover exception upon next heartbeat
+    // from AM
+    private boolean failover = false;
+    private List<ResourceRequest> lastAsk;
+    private List<ContainerId> lastRelease;
+    private List<String> lastBlacklistAdditions;
+    private List<String> lastBlacklistRemovals;
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request)
+        throws YarnException, IOException {
+      if (this.failover) {
+        this.failover = false;
+        throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
+      }
+      return null;
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnException, IOException {
+      if (this.failover) {
+        this.failover = false;
+        throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
+      }
+      this.lastAsk = request.getAskList();
+      this.lastRelease = request.getReleaseList();
+      this.lastBlacklistAdditions =
+          request.getResourceBlacklistRequest().getBlacklistAdditions();
+      this.lastBlacklistRemovals =
+          request.getResourceBlacklistRequest().getBlacklistRemovals();
+      return AllocateResponse.newInstance(0, null, null,
+          new ArrayList<NodeReport>(), Resource.newInstance(0, 0), null, 0,
+          null, null);
+    }
+
+    public void setFailoverFlag() {
+      this.failover = true;
+    }
+  }
+
+  private Configuration conf;
+  private MockApplicationMasterService mockAMS;
+  private AMRMClientRelayer relayer;
+
+  // Buffer of asks that will be sent to RM in the next AM heartbeat
+  private List<ResourceRequest> asks = new ArrayList<>();
+  private List<ContainerId> releases = new ArrayList<>();
+  private List<String> blacklistAdditions = new ArrayList<>();
+  private List<String> blacklistRemoval = new ArrayList<>();
+
+  @Before
+  public void setup() throws YarnException, IOException {
+    this.conf = new Configuration();
+
+    this.mockAMS = new MockApplicationMasterService();
+    this.relayer = new AMRMClientRelayer(this.mockAMS);
+
+    this.relayer.init(conf);
+    this.relayer.start();
+
+    this.relayer.registerApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance("", 0, ""));
+
+    clearAllocateRequestLists();
+  }
+
+  private void assertAsksAndReleases(int expectedAsk, int expectedRelease) {
+    Assert.assertEquals(expectedAsk, this.mockAMS.lastAsk.size());
+    Assert.assertEquals(expectedRelease, this.mockAMS.lastRelease.size());
+  }
+
+  private void assertBlacklistAdditionsAndRemovals(int expectedAdditions,
+      int expectedRemovals) {
+    Assert.assertEquals(expectedAdditions,
+        this.mockAMS.lastBlacklistAdditions.size());
+    Assert.assertEquals(expectedRemovals,
+        this.mockAMS.lastBlacklistRemovals.size());
+  }
+
+  private AllocateRequest getAllocateRequest() {
+    // Need to create a new one every time because rather than directly
+    // referring the lists, the protobuf impl makes a copy of the lists
+    return AllocateRequest.newInstance(0, 0, asks, releases,
+        ResourceBlacklistRequest.newInstance(blacklistAdditions,
+            blacklistRemoval));
+  }
+
+  private void clearAllocateRequestLists() {
+    this.asks.clear();
+    this.releases.clear();
+    this.blacklistAdditions.clear();
+    this.blacklistRemoval.clear();
+  }
+
+  private static ContainerId createContainerId(int id) {
+    return ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
+        id);
+  }
+
+  protected ResourceRequest createResourceRequest(long id, String resource,
+      int memory, int vCores, int priority, ExecutionType execType,
+      int containers) {
+    ResourceRequest req = Records.newRecord(ResourceRequest.class);
+    req.setAllocationRequestId(id);
+    req.setResourceName(resource);
+    req.setCapability(Resource.newInstance(memory, vCores));
+    req.setPriority(Priority.newInstance(priority));
+    req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType));
+    req.setNumContainers(containers);
+    return req;
+  }
+
+  /**
+   * Test the proper handling of removal/cancel of resource requests.
+   */
+  @Test
+  public void testResourceRequestCleanup() throws YarnException, IOException {
+    // Ask for two containers, one with location preference
+    this.asks.add(createResourceRequest(0, "node", 2048, 1, 1,
+        ExecutionType.GUARANTEED, 1));
+    this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1,
+        ExecutionType.GUARANTEED, 1));
+    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+        ExecutionType.GUARANTEED, 2));
+    this.relayer.allocate(getAllocateRequest());
+
+    assertAsksAndReleases(3, 0);
+    Assert.assertEquals(1, this.relayer.getRemotePendingAsks().size());
+    ResourceRequestSet set =
+        this.relayer.getRemotePendingAsks().values().iterator().next();
+    Assert.assertEquals(3, set.getAsks().size());
+    clearAllocateRequestLists();
+
+    // Cancel one ask
+    this.asks.add(createResourceRequest(0, "node", 2048, 1, 1,
+        ExecutionType.GUARANTEED, 0));
+    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+        ExecutionType.GUARANTEED, 1));
+    this.relayer.allocate(getAllocateRequest());
+
+    assertAsksAndReleases(2, 0);
+    Assert.assertEquals(1, relayer.getRemotePendingAsks().size());
+    set = this.relayer.getRemotePendingAsks().values().iterator().next();
+    Assert.assertEquals(2, set.getAsks().size());
+    clearAllocateRequestLists();
+
+    // Cancel the other ask, the pending askSet should be removed
+    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+        ExecutionType.GUARANTEED, 0));
+    this.relayer.allocate(AllocateRequest.newInstance(0, 0, asks, null, null));
+
+    assertAsksAndReleases(1, 0);
+    Assert.assertEquals(0, this.relayer.getRemotePendingAsks().size());
+  }
+
+  /**
+   * Test the full pending resend after RM fails over.
+   */
+  @Test
+  public void testResendRequestsOnRMRestart()
+      throws YarnException, IOException {
+    ContainerId c1 = createContainerId(1);
+    ContainerId c2 = createContainerId(2);
+    ContainerId c3 = createContainerId(3);
+
+    // Ask for two containers, one with location preference
+    this.asks.add(createResourceRequest(0, "node1", 2048, 1, 1,
+        ExecutionType.GUARANTEED, 1));
+    this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1,
+        ExecutionType.GUARANTEED, 1));
+    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+        ExecutionType.GUARANTEED, 2));
+
+    this.releases.add(c1);
+    this.blacklistAdditions.add("node1");
+    this.blacklistRemoval.add("node0");
+
+    // 1. a fully loaded request
+    this.relayer.allocate(getAllocateRequest());
+    assertAsksAndReleases(3, 1);
+    assertBlacklistAdditionsAndRemovals(1, 1);
+    clearAllocateRequestLists();
+
+    // 2. empty request
+    this.relayer.allocate(getAllocateRequest());
+    assertAsksAndReleases(0, 0);
+    assertBlacklistAdditionsAndRemovals(0, 0);
+    clearAllocateRequestLists();
+
+    // Set RM restart and failover flag
+    this.mockAMS.setFailoverFlag();
+
+    // More requests
+    this.blacklistAdditions.add("node2");
+    this.releases.add(c2);
+    this.relayer.allocate(getAllocateRequest());
+
+    // verify pending requests are fully re-sent
+    assertAsksAndReleases(3, 2);
+    assertBlacklistAdditionsAndRemovals(2, 0);
+    clearAllocateRequestLists();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 9a53a50..5740749 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -78,7 +79,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
-import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.apache.hadoop.yarn.util.ConverterUtils;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index ae28879..7dac2cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@@ -73,7 +74,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.proces
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
-import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org