You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2018/09/12 18:47:08 UTC

hadoop git commit: YARN-8658. [AMRMProxy] Metrics for AMRMClientRelayer inside FederationInterceptor. Contributed by Young Chen.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 64c7a12b5 -> 02b9bfdf9


YARN-8658. [AMRMProxy] Metrics for AMRMClientRelayer inside FederationInterceptor. Contributed by Young Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/02b9bfdf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/02b9bfdf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/02b9bfdf

Branch: refs/heads/trunk
Commit: 02b9bfdf9e4bd0b3c05ca5fd75399dedcb656e09
Parents: 64c7a12
Author: Giovanni Matteo Fumarola <gi...@apache.org>
Authored: Wed Sep 12 11:46:35 2018 -0700
Committer: Giovanni Matteo Fumarola <gi...@apache.org>
Committed: Wed Sep 12 11:46:35 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/server/AMRMClientRelayer.java   | 412 ++++++++++-----
 .../metrics/AMRMClientRelayerMetrics.java       | 368 +++++++++++++
 .../yarn/server/metrics/package-info.java       |  18 +
 .../yarn/server/uam/UnmanagedAMPoolManager.java |  30 +-
 .../server/uam/UnmanagedApplicationManager.java |  16 +-
 .../yarn/server/TestAMRMClientRelayer.java      |   2 +-
 .../metrics/TestAMRMClientRelayerMetrics.java   | 513 +++++++++++++++++++
 .../uam/TestUnmanagedApplicationManager.java    |   2 +-
 .../amrmproxy/FederationInterceptor.java        |  19 +-
 .../TestableFederationInterceptor.java          |   5 +-
 10 files changed, 1242 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
index 1e2060c..2621d3e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
@@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics;
 import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
 import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
 import org.slf4j.Logger;
@@ -98,6 +100,15 @@ public class AMRMClientRelayer extends AbstractService
   private Set<ResourceRequest> ask =
       new TreeSet<>(new ResourceRequest.ResourceRequestComparator());
 
+  /**
+   * Data structures for pending and allocate latency metrics. This only applies
+   * for requests with non-zero allocationRequestId.
+   */
+  private Map<Long, Integer> pendingCountForMetrics = new HashMap<>();
+  private Map<Long, Long> askTimeStamp = new HashMap<>();
+  // List of allocated containerId to avoid double counting
+  private Set<ContainerId> knownContainers = new HashSet<>();
+
   private Set<ContainerId> remotePendingRelease = new HashSet<>();
   private Set<ContainerId> release = new HashSet<>();
 
@@ -108,6 +119,7 @@ public class AMRMClientRelayer extends AbstractService
   private Map<ContainerId, UpdateContainerRequest> remotePendingChange =
       new HashMap<>();
   private Map<ContainerId, UpdateContainerRequest> change = new HashMap<>();
+  private Map<ContainerId, Long> changeTimeStamp = new HashMap<>();
 
   private Map<Set<String>, List<SchedulingRequest>> remotePendingSchedRequest =
       new HashMap<>();
@@ -119,16 +131,26 @@ public class AMRMClientRelayer extends AbstractService
   // heartbeat
   private volatile int resetResponseId;
 
+  private String rmId = "";
+  private volatile boolean shutdown = false;
+
+  private AMRMClientRelayerMetrics metrics;
+
   public AMRMClientRelayer() {
     super(AMRMClientRelayer.class.getName());
     this.resetResponseId = -1;
+    this.metrics = AMRMClientRelayerMetrics.getInstance();
+    this.rmClient = null;
+    this.appId = null;
+    this.rmId = "";
   }
 
   public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
-      ApplicationId appId) {
+      ApplicationId appId, String rmId) {
     this();
     this.rmClient = rmClient;
     this.appId = appId;
+    this.rmId = rmId;
   }
 
   @Override
@@ -155,6 +177,7 @@ public class AMRMClientRelayer extends AbstractService
     if (this.rmClient != null) {
       RPC.stopProxy(this.rmClient);
     }
+    shutdown();
     super.serviceStop();
   }
 
@@ -163,6 +186,49 @@ public class AMRMClientRelayer extends AbstractService
     this.amRegistrationRequest = registerRequest;
   }
 
+  public void setRMClient(ApplicationMasterProtocol client){
+    this.rmClient = client;
+  }
+
+  public void shutdown() {
+    // On finish, clear out our pending count from the metrics
+    // and set the shut down flag so no more pending requests get
+    // added
+    synchronized (this) {
+      if (this.shutdown) {
+        LOG.warn(
+            "Shutdown called twice for AMRMClientRelayer for RM " + this.rmId);
+        return;
+      }
+      this.shutdown = true;
+      for (Map.Entry<ResourceRequestSetKey, ResourceRequestSet> entry
+          : this.remotePendingAsks .entrySet()) {
+        ResourceRequestSetKey key = entry.getKey();
+        if (key.getAllocationRequestId() == 0) {
+          this.metrics.decrClientPending(this.rmId,
+              AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
+              entry.getValue().getNumContainers());
+        } else {
+          this.askTimeStamp.remove(key.getAllocationRequestId());
+          Integer pending =
+              this.pendingCountForMetrics.remove(key.getAllocationRequestId());
+          if (pending == null) {
+            throw new YarnRuntimeException(
+                "pendingCountForMetrics not found for key " + key
+                    + " during shutdown");
+          }
+          this.metrics.decrClientPending(this.rmId,
+              AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
+              pending);
+        }
+      }
+      for(UpdateContainerRequest req : remotePendingChange.values()) {
+        this.metrics
+            .decrClientPending(rmId, req.getContainerUpdateType(), 1);
+      }
+    }
+  }
+
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request)
@@ -178,7 +244,8 @@ public class AMRMClientRelayer extends AbstractService
     try {
       return this.rmClient.finishApplicationMaster(request);
     } catch (ApplicationMasterNotRegisteredException e) {
-      LOG.warn("Out of sync with RM for " + this.appId + ", hence resyncing.");
+      LOG.warn("Out of sync with RM " + rmId
+          + " for " + this.appId + ", hence resyncing.");
       // re register with RM
       registerApplicationMaster(this.amRegistrationRequest);
       return finishApplicationMaster(request);
@@ -215,7 +282,23 @@ public class AMRMClientRelayer extends AbstractService
     if (allocateRequest.getUpdateRequests() != null) {
       for (UpdateContainerRequest update : allocateRequest
           .getUpdateRequests()) {
-        this.remotePendingChange.put(update.getContainerId(), update);
+        UpdateContainerRequest req =
+            this.remotePendingChange.put(update.getContainerId(), update);
+        this.changeTimeStamp
+            .put(update.getContainerId(), System.currentTimeMillis());
+        if (req == null) {
+          // If this is a brand new request, all we have to do is increment
+          this.metrics
+              .incrClientPending(rmId, update.getContainerUpdateType(), 1);
+        } else if (req.getContainerUpdateType() != update
+            .getContainerUpdateType()) {
+          // If this is replacing a request with a different update type, we
+          // need to decrement the replaced type
+          this.metrics
+              .decrClientPending(rmId, req.getContainerUpdateType(), 1);
+          this.metrics
+              .incrClientPending(rmId, update.getContainerUpdateType(), 1);
+        }
         this.change.put(update.getContainerId(), update);
       }
     }
@@ -232,141 +315,196 @@ public class AMRMClientRelayer extends AbstractService
   public AllocateResponse allocate(AllocateRequest allocateRequest)
       throws YarnException, IOException {
     AllocateResponse allocateResponse = null;
-    try {
-      synchronized (this) {
-        addNewAllocateRequest(allocateRequest);
+    long startTime = System.currentTimeMillis();
+    synchronized (this) {
+      if(this.shutdown){
+        throw new YarnException("Allocate called after AMRMClientRelayer for "
+            + "RM " + rmId + " shutdown.");
+      }
+      addNewAllocateRequest(allocateRequest);
 
-        ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
-        for (ResourceRequest r : ask) {
-          // create a copy of ResourceRequest as we might change it while the
-          // RPC layer is using it to send info across
-          askList.add(ResourceRequest.clone(r));
-        }
+      ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
+      for (ResourceRequest r : ask) {
+        // create a copy of ResourceRequest as we might change it while the
+        // RPC layer is using it to send info across
+        askList.add(ResourceRequest.clone(r));
+      }
 
-        allocateRequest = AllocateRequest.newBuilder()
-            .responseId(allocateRequest.getResponseId())
-            .progress(allocateRequest.getProgress()).askList(askList)
-            .releaseList(new ArrayList<>(this.release))
-            .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
-                new ArrayList<>(this.blacklistAdditions),
-                new ArrayList<>(this.blacklistRemovals)))
-            .updateRequests(new ArrayList<>(this.change.values()))
-            .schedulingRequests(new ArrayList<>(this.schedulingRequest))
-            .build();
-
-        if (this.resetResponseId != -1) {
-          LOG.info("Override allocate responseId from "
-              + allocateRequest.getResponseId() + " to " + this.resetResponseId
-              + " for " + this.appId);
-          allocateRequest.setResponseId(this.resetResponseId);
-        }
+      allocateRequest = AllocateRequest.newBuilder()
+          .responseId(allocateRequest.getResponseId())
+          .progress(allocateRequest.getProgress()).askList(askList)
+          .releaseList(new ArrayList<>(this.release))
+          .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
+              new ArrayList<>(this.blacklistAdditions),
+              new ArrayList<>(this.blacklistRemovals)))
+          .updateRequests(new ArrayList<>(this.change.values()))
+          .schedulingRequests(new ArrayList<>(this.schedulingRequest))
+          .build();
+
+      if (this.resetResponseId != -1) {
+        LOG.info("Override allocate responseId from "
+            + allocateRequest.getResponseId() + " to " + this.resetResponseId
+            + " for " + this.appId);
+        allocateRequest.setResponseId(this.resetResponseId);
       }
+    }
 
-      // Do the actual allocate call
-      try {
-        allocateResponse = this.rmClient.allocate(allocateRequest);
-
-        // Heartbeat succeeded, wipe out responseId overriding
-        this.resetResponseId = -1;
-      } catch (ApplicationMasterNotRegisteredException e) {
-        LOG.warn("ApplicationMaster is out of sync with RM for " + this.appId
-            + " hence resyncing.");
-
-        synchronized (this) {
-          // Add all remotePending data into to-send data structures
-          for (ResourceRequestSet requestSet : this.remotePendingAsks
-              .values()) {
-            for (ResourceRequest rr : requestSet.getRRs()) {
-              addResourceRequestToAsk(rr);
-            }
-          }
-          this.release.addAll(this.remotePendingRelease);
-          this.blacklistAdditions.addAll(this.remoteBlacklistedNodes);
-          this.change.putAll(this.remotePendingChange);
-          for (List<SchedulingRequest> reqs : this.remotePendingSchedRequest
-              .values()) {
-            this.schedulingRequest.addAll(reqs);
+    // Do the actual allocate call
+    try {
+      allocateResponse = this.rmClient.allocate(allocateRequest);
+
+      // Heartbeat succeeded, wipe out responseId overriding
+      this.resetResponseId = -1;
+    } catch (ApplicationMasterNotRegisteredException e) {
+      // This is a retriable exception - we will re register and mke a
+      // recursive call to retry
+      LOG.warn("ApplicationMaster is out of sync with RM " + rmId
+          + " for " + this.appId + ", hence resyncing.");
+
+      this.metrics.incrRMMasterSlaveSwitch(this.rmId);
+
+      synchronized (this) {
+        // Add all remotePending data into to-send data structures
+        for (ResourceRequestSet requestSet : this.remotePendingAsks
+            .values()) {
+          for (ResourceRequest rr : requestSet.getRRs()) {
+            addResourceRequestToAsk(rr);
           }
         }
+        this.release.addAll(this.remotePendingRelease);
+        this.blacklistAdditions.addAll(this.remoteBlacklistedNodes);
+        this.change.putAll(this.remotePendingChange);
+        for (List<SchedulingRequest> reqs : this.remotePendingSchedRequest
+            .values()) {
+          this.schedulingRequest.addAll(reqs);
+        }
+      }
 
-        // re-register with RM, then retry allocate recursively
-        registerApplicationMaster(this.amRegistrationRequest);
-        // Reset responseId after re-register
-        allocateRequest.setResponseId(0);
-        return allocate(allocateRequest);
-      } catch (Throwable t) {
-
-        // If RM is complaining about responseId out of sync, force reset next
-        // time
-        if (t instanceof InvalidApplicationMasterRequestException) {
-          int responseId = AMRMClientUtils
-              .parseExpectedResponseIdFromException(t.getMessage());
-          if (responseId != -1) {
-            this.resetResponseId = responseId;
-            LOG.info("ResponseId out of sync with RM, expect " + responseId
-                + " but " + allocateRequest.getResponseId() + " used by "
-                + this.appId + ". Will override in the next allocate.");
-          } else {
-            LOG.warn("Failed to parse expected responseId out of exception for "
-                + this.appId);
-          }
+      // re-register with RM, then retry allocate recursively
+      registerApplicationMaster(this.amRegistrationRequest);
+      // Reset responseId after re-register
+      allocateRequest.setResponseId(0);
+      allocateResponse = allocate(allocateRequest);
+      return allocateResponse;
+    } catch (Throwable t) {
+      // Unexpected exception - rethrow and increment heart beat failure metric
+      this.metrics.addHeartbeatFailure(this.rmId,
+          System.currentTimeMillis() - startTime);
+
+      // If RM is complaining about responseId out of sync, force reset next
+      // time
+      if (t instanceof InvalidApplicationMasterRequestException) {
+        int responseId = AMRMClientUtils
+            .parseExpectedResponseIdFromException(t.getMessage());
+        if (responseId != -1) {
+          this.resetResponseId = responseId;
+          LOG.info("ResponseId out of sync with RM, expect " + responseId
+              + " but " + allocateRequest.getResponseId() + " used by "
+              + this.appId + ". Will override in the next allocate.");
+        } else {
+          LOG.warn("Failed to parse expected responseId out of exception for "
+              + this.appId);
         }
+      }
+
+      throw t;
+    }
 
-        throw t;
+    synchronized (this) {
+      if (this.shutdown) {
+        throw new YarnException("Allocate call succeeded for " + this.appId
+            + " after AMRMClientRelayer for RM " + rmId + " shutdown.");
       }
 
-      synchronized (this) {
-        // Process the allocate response from RM
-        if (allocateResponse.getCompletedContainersStatuses() != null) {
-          for (ContainerStatus container : allocateResponse
-              .getCompletedContainersStatuses()) {
-            this.remotePendingRelease.remove(container.getContainerId());
-            this.remotePendingChange.remove(container.getContainerId());
-          }
-        }
+      updateMetrics(allocateResponse, startTime);
 
-        if (allocateResponse.getUpdatedContainers() != null) {
-          for (UpdatedContainer updatedContainer : allocateResponse
-              .getUpdatedContainers()) {
-            this.remotePendingChange
-                .remove(updatedContainer.getContainer().getId());
+      AMRMClientUtils.removeFromOutstandingSchedulingRequests(
+          allocateResponse.getAllocatedContainers(),
+          this.remotePendingSchedRequest);
+      AMRMClientUtils.removeFromOutstandingSchedulingRequests(
+          allocateResponse.getContainersFromPreviousAttempts(),
+          this.remotePendingSchedRequest);
+
+      this.ask.clear();
+      this.release.clear();
+
+      this.blacklistAdditions.clear();
+      this.blacklistRemovals.clear();
+
+      this.change.clear();
+      this.schedulingRequest.clear();
+      return allocateResponse;
+    }
+  }
+
+  private void updateMetrics(AllocateResponse allocateResponse,
+      long startTime) {
+    this.metrics.addHeartbeatSuccess(this.rmId,
+        System.currentTimeMillis() - startTime);
+    // Process the allocate response from RM
+    if (allocateResponse.getAllocatedContainers() != null) {
+      for (Container container : allocateResponse
+          .getAllocatedContainers()) {
+        // Do not update metrics aggressively for AllocationRequestId zero
+        // case. Also avoid double count to due to re-send
+        if (this.knownContainers.add(container.getId())) {
+          this.metrics.addFulfilledQPS(this.rmId, AMRMClientRelayerMetrics
+              .getRequestType(container.getExecutionType()), 1);
+          if (container.getAllocationRequestId() != 0) {
+            Integer count = this.pendingCountForMetrics
+                .get(container.getAllocationRequestId());
+            if (count != null && count > 0) {
+              this.pendingCountForMetrics
+                  .put(container.getAllocationRequestId(), --count);
+              this.metrics.decrClientPending(this.rmId,
+                  AMRMClientRelayerMetrics
+                      .getRequestType(container.getExecutionType()), 1);
+              this.metrics.addFulfillLatency(this.rmId,
+                  AMRMClientRelayerMetrics
+                      .getRequestType(container.getExecutionType()),
+                  System.currentTimeMillis() - this.askTimeStamp
+                      .get(container.getAllocationRequestId()));
+            }
           }
         }
-
-        AMRMClientUtils.removeFromOutstandingSchedulingRequests(
-            allocateResponse.getAllocatedContainers(),
-            this.remotePendingSchedRequest);
-        AMRMClientUtils.removeFromOutstandingSchedulingRequests(
-            allocateResponse.getContainersFromPreviousAttempts(),
-            this.remotePendingSchedRequest);
       }
+    }
+    if (allocateResponse.getCompletedContainersStatuses() != null) {
+      for (ContainerStatus container : allocateResponse
+          .getCompletedContainersStatuses()) {
+        this.remotePendingRelease.remove(container.getContainerId());
+        UpdateContainerRequest req =
+            this.remotePendingChange.remove(container.getContainerId());
+        if (req != null) {
+          this.metrics
+              .decrClientPending(rmId, req.getContainerUpdateType(), 1);
+        }
+        this.knownContainers.remove(container.getContainerId());
+      }
+    }
 
-    } finally {
-      synchronized (this) {
-        /*
-         * If allocateResponse is null, it means exception happened and RM did
-         * not accept the request. Don't clear any data structures so that they
-         * will be re-sent next time.
-         *
-         * Otherwise request was accepted by RM, we are safe to clear these.
-         */
-        if (allocateResponse != null) {
-          this.ask.clear();
-          this.release.clear();
-
-          this.blacklistAdditions.clear();
-          this.blacklistRemovals.clear();
-
-          this.change.clear();
-          this.schedulingRequest.clear();
+    if (allocateResponse.getUpdatedContainers() != null) {
+      for (UpdatedContainer updatedContainer : allocateResponse
+          .getUpdatedContainers()) {
+        UpdateContainerRequest req = this.remotePendingChange
+            .remove(updatedContainer.getContainer().getId());
+        if (req != null) {
+          this.metrics
+              .decrClientPending(rmId, req.getContainerUpdateType(), 1);
+          this.metrics.addFulfillLatency(rmId, req.getContainerUpdateType(),
+              System.currentTimeMillis() - this.changeTimeStamp
+                  .remove(req.getContainerId()));
+          this.metrics
+              .addFulfilledQPS(rmId, req.getContainerUpdateType(), 1);
         }
       }
     }
-    return allocateResponse;
+
   }
 
   private void addNewAsks(List<ResourceRequest> asks) throws YarnException {
     Set<ResourceRequestSetKey> touchedKeys = new HashSet<>();
+    Set<ResourceRequestSetKey> nonZeroNewKeys = new HashSet<>();
     for (ResourceRequest rr : asks) {
       addResourceRequestToAsk(rr);
 
@@ -377,8 +515,38 @@ public class AMRMClientRelayer extends AbstractService
       if (askSet == null) {
         askSet = new ResourceRequestSet(key);
         this.remotePendingAsks.put(key, askSet);
+        if (key.getAllocationRequestId() != 0) {
+          nonZeroNewKeys.add(key);
+        }
       }
+
+      int numContainers = askSet.getNumContainers();
       askSet.addAndOverrideRR(rr);
+      int deltaContainers = askSet.getNumContainers() - numContainers;
+
+      if (key.getAllocationRequestId() == 0) {
+        // AllocationRequestId is zero, keep track of pending count in the
+        // delayed but correct way. Allocation latency is not supported
+        if (deltaContainers != 0) {
+          this.metrics.incrClientPending(this.rmId,
+              AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
+              deltaContainers);
+          if(deltaContainers > 0){
+            this.metrics.addRequestedQPS(this.rmId,
+                AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
+                deltaContainers);
+          }
+        }
+      } else {
+        // AllocationRequestId is non-zero, we do pending decrement and latency
+        // aggressively. So don't update metrics here. Double check AM is not
+        // reusing the requestId for more asks
+        if (deltaContainers > 0 && numContainers != 0) {
+          throw new YarnException("Received new ask ("
+              + askSet.getNumContainers() + ") on top of existing ("
+              + numContainers + ") in key " + key);
+        }
+      }
     }
 
     // Cleanup properly if needed
@@ -391,6 +559,20 @@ public class AMRMClientRelayer extends AbstractService
         askSet.cleanupZeroNonAnyRR();
       }
     }
+
+    // Initialize data for pending metrics for each new key
+    for (ResourceRequestSetKey key : nonZeroNewKeys) {
+      if(remotePendingAsks.containsKey(key)){
+        this.askTimeStamp.put(key.getAllocationRequestId(),
+            System.currentTimeMillis());
+        int count = this.remotePendingAsks.get(key).getNumContainers();
+        this.pendingCountForMetrics.put(key.getAllocationRequestId(), count);
+        this.metrics.incrClientPending(this.rmId,
+            AMRMClientRelayerMetrics.getRequestType(key.getExeType()), count);
+        this.metrics.addRequestedQPS(this.rmId,
+            AMRMClientRelayerMetrics.getRequestType(key.getExeType()), count);
+      }
+    }
   }
 
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java
new file mode 100644
index 0000000..6ce5851
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * Metrics for FederationInterceptor Internals.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "Performance and usage metrics for YARN AMRMClientRelayer",
+    context = "fedr")
+public final class AMRMClientRelayerMetrics implements MetricsSource{
+
+  /**
+   * Easier classification of request types for logging metrics.
+   */
+  public enum RequestType {
+    Guaranteed, Opportunistic, Promote, Demote;
+
+    @Override
+    public String toString() {
+      switch (this) {
+      case Guaranteed:
+        return "G";
+      case Opportunistic:
+        return "O";
+      case Promote:
+        return "P";
+      case Demote:
+        return "D";
+      default:
+        throw new IllegalArgumentException();
+      }
+    }
+  }
+
+  private static AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+  private static final MetricsInfo RECORD_INFO =
+      info("AMRMClientRelayerMetrics",
+          "Metrics for the Yarn AMRMClientRelayer");
+
+  private static volatile AMRMClientRelayerMetrics instance = null;
+  private static MetricsRegistry registry;
+
+  // The metrics are set up as a map from string (typically sub cluster id) to
+  // request type (Guaranteed, Opp, Promote, Demote) to the counter.
+  // The counters are constructed lazily when the first metric entry
+  // comes in.
+  // For some metrics, request type is not applicable.
+  private final Map<String, Map<RequestType, MutableGaugeLong>>
+      rmClientPending = new ConcurrentHashMap<>();
+
+  private final Map<String, Map<RequestType, MutableQuantiles>> fulfillLatency =
+      new ConcurrentHashMap<>();
+
+  private final Map<String, Map<RequestType, MutableGaugeLong>>
+      requestedQps = new ConcurrentHashMap<>();
+
+  private final Map<String, Map<RequestType, MutableGaugeLong>>
+      fulfilledQps = new ConcurrentHashMap<>();
+
+  private final Map<String, MutableGaugeLong> rmMasterSlaveSwitch =
+      new ConcurrentHashMap<>();
+
+  private final Map<String, MutableGaugeLong> heartbeatFailure =
+      new ConcurrentHashMap<>();
+
+  private final Map<String, MutableGaugeLong> heartbeatSuccess =
+      new ConcurrentHashMap<>();
+  private final Map<String, MutableQuantiles> heartbeatLatency =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Initialize the singleton instance.
+   *
+   * @return the singleton
+   */
+  public static AMRMClientRelayerMetrics getInstance() {
+    if (!isInitialized.get()) {
+      synchronized (AMRMClientRelayerMetrics.class) {
+        if (instance == null) {
+          instance = new AMRMClientRelayerMetrics();
+          DefaultMetricsSystem.instance().register(RECORD_INFO.name(),
+              RECORD_INFO.description(), instance);
+          isInitialized.set(true);
+        }
+      }
+    }
+    return instance;
+  }
+
+  private AMRMClientRelayerMetrics()  {
+    registry = new MetricsRegistry(RECORD_INFO);
+    registry.tag(RECORD_INFO, "AMRMClientRelayer");
+  }
+
+  public static RequestType getRequestType(ExecutionType execType) {
+    if (execType == null || execType.equals(ExecutionType.GUARANTEED)) {
+      return RequestType.Guaranteed;
+    }
+    return RequestType.Opportunistic;
+  }
+
+  @VisibleForTesting
+  protected MutableGaugeLong getPendingMetric(String instanceId,
+      RequestType type) {
+    synchronized (rmClientPending) {
+      if (!rmClientPending.containsKey(instanceId)) {
+        rmClientPending.put(instanceId,
+            new ConcurrentHashMap<RequestType, MutableGaugeLong>());
+      }
+      if (!rmClientPending.get(instanceId).containsKey(type)) {
+        rmClientPending.get(instanceId).put(type, registry
+            .newGauge(type.toString() + "Pending" + instanceId,
+                "Remove pending of " + type + " for " + instanceId, 0L));
+      }
+    }
+    return rmClientPending.get(instanceId).get(type);
+  }
+
+  public void incrClientPending(String instanceId, RequestType type, int diff) {
+    getPendingMetric(instanceId, type).incr(diff);
+  }
+
+  public void decrClientPending(String instanceId, RequestType type, int diff) {
+    getPendingMetric(instanceId, type).decr(diff);
+  }
+
+  @VisibleForTesting
+  protected void setClientPending(String instanceId, RequestType type,
+      int val) {
+    getPendingMetric(instanceId, type).set(val);
+  }
+
+  @VisibleForTesting
+  protected MutableQuantiles getFulfillLatencyMetric(String instanceId,
+      RequestType type) {
+    synchronized (fulfillLatency) {
+      if (!fulfillLatency.containsKey(instanceId)) {
+        fulfillLatency.put(instanceId,
+            new ConcurrentHashMap<RequestType, MutableQuantiles>());
+      }
+      if (!fulfillLatency.get(instanceId).containsKey(type)) {
+        fulfillLatency.get(instanceId).put(type, registry
+            .newQuantiles(type.toString() + "FulfillLatency" + instanceId,
+                "FulfillLatency of " + type + " for " + instanceId, "ops",
+                "latency", 60));
+      }
+    }
+    return fulfillLatency.get(instanceId).get(type);
+  }
+
+  public void addFulfillLatency(String instanceId, RequestType type,
+      long latency) {
+    getFulfillLatencyMetric(instanceId, type).add(latency);
+  }
+
+  public void addFulfillLatency(String instanceId, ContainerUpdateType type,
+      long latency) {
+    switch(type) {
+    case DEMOTE_EXECUTION_TYPE:
+      addFulfillLatency(instanceId, RequestType.Demote, latency);
+      break;
+    case PROMOTE_EXECUTION_TYPE:
+      addFulfillLatency(instanceId, RequestType.Promote, latency);
+      break;
+    default:
+      break;
+    }
+  }
+
+  @VisibleForTesting
+  protected MutableGaugeLong getRequestedQPSMetric(String instanceId,
+      RequestType type) {
+    synchronized (requestedQps) {
+      if (!requestedQps.containsKey(instanceId)) {
+        requestedQps.put(instanceId,
+            new ConcurrentHashMap<RequestType, MutableGaugeLong>());
+      }
+      if (!requestedQps.get(instanceId).containsKey(type)) {
+        requestedQps.get(instanceId)
+            .put(type, registry.newGauge(
+                info(type.toString() + "RequestedOps" + instanceId,
+                    "Requested operations of " + type + " for " + instanceId),
+                0L));
+      }
+    }
+    return requestedQps.get(instanceId).get(type);
+  }
+
+  public void addRequestedQPS(String instanceId, RequestType type,
+      long numEntries) {
+    getRequestedQPSMetric(instanceId, type).incr(numEntries);
+  }
+
+  @VisibleForTesting
+  protected MutableGaugeLong getFulfilledQPSMetric(String instanceId,
+      RequestType type) {
+    synchronized (fulfilledQps) {
+      if (!fulfilledQps.containsKey(instanceId)) {
+        fulfilledQps.put(instanceId,
+            new ConcurrentHashMap<RequestType, MutableGaugeLong>());
+      }
+      if (!fulfilledQps.get(instanceId).containsKey(type)) {
+        fulfilledQps.get(instanceId)
+            .put(type, registry.newGauge(
+                info(type.toString() + "FulfilledOps" + instanceId,
+                    "Fulfilled operations of " + type + " for " + instanceId),
+                0L));
+      }
+    }
+    return fulfilledQps.get(instanceId).get(type);
+  }
+
+  public void addFulfilledQPS(String instanceId, RequestType type,
+      long numEntries) {
+    getFulfilledQPSMetric(instanceId, type).incr(numEntries);
+  }
+
+  public void addFulfilledQPS(String instanceId, ContainerUpdateType type,
+      long latency) {
+    switch(type) {
+    case DEMOTE_EXECUTION_TYPE:
+      addFulfilledQPS(instanceId, RequestType.Demote, latency);
+      break;
+    case PROMOTE_EXECUTION_TYPE:
+      addFulfilledQPS(instanceId, RequestType.Promote, latency);
+      break;
+    default:
+      break;
+    }
+  }
+
+  public void incrClientPending(String scId, ContainerUpdateType type,
+      int diff) {
+    switch(type) {
+    case DEMOTE_EXECUTION_TYPE:
+      incrClientPending(scId, RequestType.Demote, diff);
+      break;
+    case PROMOTE_EXECUTION_TYPE:
+      incrClientPending(scId, RequestType.Promote, diff);
+      break;
+    default:
+      break;
+    }
+  }
+
+  public void decrClientPending(String scId, ContainerUpdateType type,
+      int diff) {
+    switch(type) {
+    case DEMOTE_EXECUTION_TYPE:
+      decrClientPending(scId, RequestType.Demote, diff);
+      break;
+    case PROMOTE_EXECUTION_TYPE:
+      decrClientPending(scId, RequestType.Promote, diff);
+      break;
+    default:
+      break;
+    }
+  }
+
+  @VisibleForTesting
+  protected MutableGaugeLong getRMMasterSlaveSwitchMetric(
+      String instanceId) {
+    synchronized (rmMasterSlaveSwitch) {
+      if (!rmMasterSlaveSwitch.containsKey(instanceId)) {
+        rmMasterSlaveSwitch.put(instanceId, registry.newGauge(
+            info("RMMasterSlaveSwitch" + instanceId,
+                "Number of RM master slave switch"), 0L));
+      }
+    }
+    return rmMasterSlaveSwitch.get(instanceId);
+  }
+
+  public void incrRMMasterSlaveSwitch(String instanceId) {
+    getRMMasterSlaveSwitchMetric(instanceId).incr();
+  }
+
+  @VisibleForTesting
+  protected MutableQuantiles getHeartbeatLatencyMetric(String instanceId) {
+    synchronized (heartbeatLatency) {
+      if (!heartbeatLatency.containsKey(instanceId)) {
+        heartbeatLatency.put(instanceId, registry
+            .newQuantiles("HeartbeatLatency" + instanceId,
+                "HeartbeatLatency for " + instanceId, "ops", "latency", 60));
+      }
+    }
+    return heartbeatLatency.get(instanceId);
+  }
+
+  @VisibleForTesting
+  protected MutableGaugeLong getHeartbeatFailureMetric(
+      String instanceId) {
+    synchronized (heartbeatFailure) {
+      if (!heartbeatFailure.containsKey(instanceId)) {
+        heartbeatFailure.put(instanceId, registry.newGauge(
+            info("HeartbeatFailure" + instanceId,
+                "Number of Heartbeat Failures"), 0L));
+      }
+    }
+    return heartbeatFailure.get(instanceId);
+  }
+
+  public void addHeartbeatFailure(String instanceId, long latency) {
+    getHeartbeatFailureMetric(instanceId).incr();
+
+    getHeartbeatLatencyMetric(instanceId).add(latency);
+  }
+
+  @VisibleForTesting
+  protected MutableGaugeLong getHeartbeatSuccessMetric(
+      String instanceId) {
+    synchronized (heartbeatSuccess) {
+      if (!heartbeatSuccess.containsKey(instanceId)) {
+        heartbeatSuccess.put(instanceId, registry.newGauge(
+            info("HeartbeatSuccess" + instanceId,
+                "Number of Heartbeat"), 0L));
+      }
+    }
+    return heartbeatSuccess.get(instanceId);
+  }
+
+  public void addHeartbeatSuccess(String instanceId, long latency) {
+    getHeartbeatSuccessMetric(instanceId).incr();
+
+    getHeartbeatLatencyMetric(instanceId).add(latency);
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector builder, boolean all) {
+    registry.snapshot(builder.addRecord(registry.info().name()), all);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java
new file mode 100644
index 0000000..a67cf93
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.metrics;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 5f9d81b..d708ced 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -150,6 +150,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
    * @param appNameSuffix application name suffix for the UAM
    * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
    *          recovery.
+   * @param rmName name of the YarnRM
    * @see ApplicationSubmissionContext
    *          #setKeepContainersAcrossApplicationAttempts(boolean)
    * @return uamId for the UAM
@@ -159,7 +160,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
   public String createAndRegisterNewUAM(
       RegisterApplicationMasterRequest registerRequest, Configuration conf,
       String queueName, String submitter, String appNameSuffix,
-      boolean keepContainersAcrossApplicationAttempts)
+      boolean keepContainersAcrossApplicationAttempts, String rmName)
       throws YarnException, IOException {
     ApplicationId appId = null;
     ApplicationClientProtocol rmClient;
@@ -183,7 +184,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
 
     // Launch the UAM in RM
     launchUAM(appId.toString(), conf, appId, queueName, submitter,
-        appNameSuffix, keepContainersAcrossApplicationAttempts);
+        appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
 
     // Register the UAM application
     registerApplicationMaster(appId.toString(), registerRequest);
@@ -203,6 +204,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
    * @param appNameSuffix application name suffix for the UAM
    * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
    *          recovery.
+   * @param rmName name of the YarnRM
    * @see ApplicationSubmissionContext
    *          #setKeepContainersAcrossApplicationAttempts(boolean)
    * @return UAM token
@@ -211,14 +213,15 @@ public class UnmanagedAMPoolManager extends AbstractService {
    */
   public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf,
       ApplicationId appId, String queueName, String submitter,
-      String appNameSuffix, boolean keepContainersAcrossApplicationAttempts)
-      throws YarnException, IOException {
+      String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
+      String rmName) throws YarnException, IOException {
 
     if (this.unmanagedAppMasterMap.containsKey(uamId)) {
       throw new YarnException("UAM " + uamId + " already exists");
     }
     UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
-        submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
+        submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
+        rmName);
     // Put the UAM into map first before initializing it to avoid additional UAM
     // for the same uamId being created concurrently
     this.unmanagedAppMasterMap.put(uamId, uam);
@@ -248,19 +251,20 @@ public class UnmanagedAMPoolManager extends AbstractService {
    * @param submitter submitter name of the UAM
    * @param appNameSuffix application name suffix for the UAM
    * @param uamToken UAM token
+   * @param rmName name of the YarnRM
    * @throws YarnException if fails
    * @throws IOException if fails
    */
-  public void reAttachUAM(String uamId, Configuration conf,
-      ApplicationId appId, String queueName, String submitter,
-      String appNameSuffix, Token<AMRMTokenIdentifier> uamToken)
+  public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId,
+      String queueName, String submitter, String appNameSuffix,
+      Token<AMRMTokenIdentifier> uamToken, String rmName)
       throws YarnException, IOException {
 
     if (this.unmanagedAppMasterMap.containsKey(uamId)) {
       throw new YarnException("UAM " + uamId + " already exists");
     }
-    UnmanagedApplicationManager uam =
-        createUAM(conf, appId, queueName, submitter, appNameSuffix, true);
+    UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
+        submitter, appNameSuffix, true, rmName);
     // Put the UAM into map first before initializing it to avoid additional UAM
     // for the same uamId being created concurrently
     this.unmanagedAppMasterMap.put(uamId, uam);
@@ -287,14 +291,16 @@ public class UnmanagedAMPoolManager extends AbstractService {
    * @param submitter submitter name of the application
    * @param appNameSuffix application name suffix
    * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
+   * @param rmName name of the YarnRM
    * @return the UAM instance
    */
   @VisibleForTesting
   protected UnmanagedApplicationManager createUAM(Configuration conf,
       ApplicationId appId, String queueName, String submitter,
-      String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
+      String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
+      String rmName) {
     return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
-        appNameSuffix, keepContainersAcrossApplicationAttempts);
+        appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 78dcfb6..7c1e154 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -116,13 +116,14 @@ public class UnmanagedApplicationManager {
    * @param queueName the queue of the UAM
    * @param submitter user name of the app
    * @param appNameSuffix the app name suffix to use
+   * @param rmName name of the YarnRM
    * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
    *          recovery. See {@link ApplicationSubmissionContext
    *          #setKeepContainersAcrossApplicationAttempts(boolean)}
    */
   public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
       String queueName, String submitter, String appNameSuffix,
-      boolean keepContainersAcrossApplicationAttempts) {
+      boolean keepContainersAcrossApplicationAttempts, String rmName) {
     Preconditions.checkNotNull(conf, "Configuration cannot be null");
     Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
     Preconditions.checkNotNull(submitter, "App submitter cannot be null");
@@ -132,9 +133,11 @@ public class UnmanagedApplicationManager {
     this.queueName = queueName;
     this.submitter = submitter;
     this.appNameSuffix = appNameSuffix;
+    this.userUgi = null;
     this.heartbeatHandler =
         new AMHeartbeatRequestHandler(this.conf, this.applicationId);
-    this.rmProxyRelayer = null;
+    this.rmProxyRelayer =
+        new AMRMClientRelayer(null, this.applicationId, rmName);
     this.connectionInitiated = false;
     this.registerRequest = null;
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
@@ -186,9 +189,8 @@ public class UnmanagedApplicationManager {
       throws IOException {
     this.userUgi = UserGroupInformation.createProxyUser(
         this.applicationId.toString(), UserGroupInformation.getCurrentUser());
-    this.rmProxyRelayer =
-        new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
-            this.conf, this.userUgi, amrmToken), this.applicationId);
+    this.rmProxyRelayer.setRMClient(createRMProxy(
+        ApplicationMasterProtocol.class, this.conf, this.userUgi, amrmToken));
 
     this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer);
     this.heartbeatHandler.setUGI(this.userUgi);
@@ -245,7 +247,7 @@ public class UnmanagedApplicationManager {
 
     this.heartbeatHandler.shutdown();
 
-    if (this.rmProxyRelayer == null) {
+    if (this.userUgi == null) {
       if (this.connectionInitiated) {
         // This is possible if the async launchUAM is still
         // blocked and retrying. Return a dummy response in this case.
@@ -299,7 +301,7 @@ public class UnmanagedApplicationManager {
     //
     // In case 2, we have already save the allocate request above, so if the
     // registration succeed later, no request is lost.
-    if (this.rmProxyRelayer == null) {
+    if (this.userUgi == null) {
       if (this.connectionInitiated) {
         LOG.info("Unmanaged AM still not successfully launched/registered yet."
             + " Saving the allocate request and send later.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
index 4c84f0b..2c016d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
@@ -140,7 +140,7 @@ public class TestAMRMClientRelayer {
     this.conf = new Configuration();
 
     this.mockAMS = new MockApplicationMasterService();
-    this.relayer = new AMRMClientRelayer(this.mockAMS, null);
+    this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST");
 
     this.relayer.init(conf);
     this.relayer.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java
new file mode 100644
index 0000000..ebbfae2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java
@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
+import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics.RequestType;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Unit test for AMRMClientRelayer.
+ */
+public class TestAMRMClientRelayerMetrics {
+
+  /**
+   * Mock AMS for easier testing and mocking of request/responses.
+   */
+  public static class MockApplicationMasterService
+      implements ApplicationMasterProtocol {
+
+    private boolean failover = false;
+    private boolean exception = false;
+    private List<ResourceRequest> lastAsk;
+    private List<ContainerId> lastRelease;
+    private List<UpdateContainerRequest> lastUpdates;
+    private List<String> lastBlacklistAdditions;
+    private List<String> lastBlacklistRemovals;
+    private AllocateResponse response = AllocateResponse
+        .newInstance(0, null, null, new ArrayList<NodeReport>(),
+            Resource.newInstance(0, 0), null, 0, null, null);
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request)
+        throws YarnException, IOException {
+      if (this.failover) {
+        this.failover = false;
+        throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
+      }
+      return null;
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnException, IOException {
+      if (this.failover) {
+        this.failover = false;
+        throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
+      }
+      if(this.exception){
+        this.exception = false;
+        throw new YarnException("Mock RM encountered exception");
+      }
+      this.lastAsk = request.getAskList();
+      this.lastRelease = request.getReleaseList();
+      this.lastUpdates = request.getUpdateRequests();
+      this.lastBlacklistAdditions =
+          request.getResourceBlacklistRequest().getBlacklistAdditions();
+      this.lastBlacklistRemovals =
+          request.getResourceBlacklistRequest().getBlacklistRemovals();
+      return response;
+    }
+
+    public void setFailoverFlag() {
+      this.failover = true;
+    }
+  }
+
+  private Configuration conf;
+  private MockApplicationMasterService mockAMS;
+  private String homeID = "home";
+  private AMRMClientRelayer homeRelayer;
+  private String uamID = "uam";
+  private AMRMClientRelayer uamRelayer;
+
+  private List<ResourceRequest> asks = new ArrayList<>();
+  private List<ContainerId> releases = new ArrayList<>();
+  private List<UpdateContainerRequest> updates = new ArrayList<>();
+  private List<String> blacklistAdditions = new ArrayList<>();
+  private List<String> blacklistRemoval = new ArrayList<>();
+
+  @Before
+  public void setup() throws YarnException, IOException {
+    this.conf = new Configuration();
+
+    this.mockAMS = new MockApplicationMasterService();
+
+    this.homeRelayer = new AMRMClientRelayer(this.mockAMS,
+        ApplicationId.newInstance(0, 0), this.homeID);
+    this.homeRelayer.init(conf);
+    this.homeRelayer.start();
+
+    this.homeRelayer.registerApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance("", 0, ""));
+
+    this.uamRelayer = new AMRMClientRelayer(this.mockAMS,
+        ApplicationId.newInstance(0, 0), this.uamID);
+    this.uamRelayer.init(conf);
+    this.uamRelayer.start();
+
+    this.uamRelayer.registerApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance("", 0, ""));
+
+    clearAllocateRequestLists();
+
+    AMRMClientRelayerMetrics.getInstance()
+        .setClientPending(homeID, RequestType.Guaranteed, 0);
+    AMRMClientRelayerMetrics.getInstance()
+        .setClientPending(homeID, RequestType.Opportunistic, 0);
+    AMRMClientRelayerMetrics.getInstance()
+        .setClientPending(homeID, RequestType.Promote, 0);
+    AMRMClientRelayerMetrics.getInstance()
+        .setClientPending(homeID, RequestType.Demote, 0);
+
+    AMRMClientRelayerMetrics.getInstance()
+        .setClientPending(uamID, RequestType.Guaranteed, 0);
+    AMRMClientRelayerMetrics.getInstance()
+        .setClientPending(uamID, RequestType.Opportunistic, 0);
+    AMRMClientRelayerMetrics.getInstance()
+        .setClientPending(uamID, RequestType.Promote, 0);
+    AMRMClientRelayerMetrics.getInstance()
+        .setClientPending(uamID, RequestType.Demote, 0);
+  }
+
+  private AllocateRequest getAllocateRequest() {
+    // Need to create a new one every time because rather than directly
+    // referring the lists, the protobuf impl makes a copy of the lists
+    return AllocateRequest.newBuilder()
+        .responseId(0)
+        .progress(0).askList(asks)
+        .releaseList(new ArrayList<>(this.releases))
+        .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(this.blacklistAdditions),
+            new ArrayList<>(this.blacklistRemoval)))
+        .updateRequests(new ArrayList<>(this.updates))
+        .build();
+  }
+
+  private void clearAllocateRequestLists() {
+    this.asks.clear();
+    this.releases.clear();
+    this.updates.clear();
+    this.blacklistAdditions.clear();
+    this.blacklistRemoval.clear();
+  }
+
+  private static UpdateContainerRequest createPromote(int id){
+    return UpdateContainerRequest.newInstance(0, createContainerId(id),
+        ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
+        ExecutionType.GUARANTEED);
+  }
+
+  private static UpdateContainerRequest createDemote(int id){
+    return UpdateContainerRequest.newInstance(0, createContainerId(id),
+        ContainerUpdateType.DEMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
+        ExecutionType.OPPORTUNISTIC);
+  }
+
+  private static ContainerId createContainerId(int id) {
+    return ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
+        id);
+  }
+
+  public ResourceRequest createResourceRequest(long id, String resource,
+      int memory, int vCores, int priority, ExecutionType execType,
+      int containers) {
+    ResourceRequest req = Records.newRecord(ResourceRequest.class);
+    req.setAllocationRequestId(id);
+    req.setResourceName(resource);
+    req.setCapability(Resource.newInstance(memory, vCores));
+    req.setPriority(Priority.newInstance(priority));
+    req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType));
+    req.setNumContainers(containers);
+    return req;
+  }
+
+  @Test
+  public void testGPending() throws YarnException, IOException {
+    // Ask for two containers, one with location preference
+    this.asks.add(
+        createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
+            1));
+    this.asks.add(
+        createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
+            1));
+    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+        ExecutionType.GUARANTEED, 2));
+    this.homeRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+    // Ask from the uam
+    this.uamRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+    // Update the any to ask for an extra container
+    this.asks.get(2).setNumContainers(3);
+    this.homeRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+    // Update the any to ask to pretend a container was allocated
+    this.asks.get(2).setNumContainers(2);
+    this.homeRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(uamID, RequestType.Guaranteed).value());
+  }
+
+  @Test
+  public void testPromotePending() throws YarnException, IOException {
+    // Ask to promote 3 containers
+    this.updates.add(createPromote(1));
+    this.updates.add(createPromote(2));
+    this.updates.add(createPromote(3));
+
+    this.homeRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Promote).value());
+
+    // Demote 2 containers, one of which is pending promote
+    this.updates.remove(createPromote(3));
+    this.updates.add(createDemote(3));
+    this.updates.add(createDemote(4));
+
+    this.homeRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Promote).value());
+
+    // Let the RM respond with two successful promotions, one of which
+    // was pending promote
+    List<UpdatedContainer> updated = new ArrayList<>();
+    updated.add(UpdatedContainer
+        .newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
+            .newInstance(createContainerId(2), null, null, null, null, null)));
+    updated.add(UpdatedContainer
+        .newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
+            .newInstance(createContainerId(5), null, null, null, null, null)));
+    this.mockAMS.response.setUpdatedContainers(updated);
+
+    this.homeRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(1, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Promote).value());
+
+    // Remove the promoted container and clean up response
+    this.mockAMS.response.getUpdatedContainers().clear();
+    this.updates.remove(createPromote(2));
+
+    // Let the RM respond with two completed containers, one of which was
+    // pending promote
+    List<ContainerStatus> completed = new ArrayList<>();
+    completed
+        .add(ContainerStatus.newInstance(createContainerId(1), null, "", 0));
+    completed
+        .add(ContainerStatus.newInstance(createContainerId(6), null, "", 0));
+    this.mockAMS.response.setCompletedContainersStatuses(completed);
+
+    this.homeRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Promote).value());
+  }
+
+  @Test
+  public void testCleanUpOnFinish() throws YarnException, IOException {
+    // Ask for two containers, one with location preference
+    this.asks.add(
+        createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
+            1));
+    this.asks.add(
+        createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
+            1));
+    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+        ExecutionType.GUARANTEED, 2));
+
+    // Ask to promote 3 containers
+    this.updates.add(createPromote(1));
+    this.updates.add(createPromote(2));
+    this.updates.add(createPromote(3));
+
+    // Run the allocate call to start tracking pending
+    this.homeRelayer.allocate(getAllocateRequest());
+
+    // After finish, the metrics should reset to zero
+    this.homeRelayer.shutdown();
+
+    Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Promote).value());
+  }
+
+  @Test
+  public void testFailover() throws YarnException, IOException {
+    // Ask for two containers, one with location preference
+    this.asks.add(
+        createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
+            1));
+    this.asks.add(
+        createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
+            1));
+    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+        ExecutionType.GUARANTEED, 2));
+
+    long previousSuccess = AMRMClientRelayerMetrics.getInstance()
+        .getHeartbeatSuccessMetric(homeID).value();
+    long previousFailover = AMRMClientRelayerMetrics.getInstance()
+        .getRMMasterSlaveSwitchMetric(homeID).value();
+    // Set failover to trigger
+    mockAMS.failover = true;
+    this.homeRelayer.allocate(getAllocateRequest());
+    // The failover metric should be incremented
+    Assert.assertEquals(++previousFailover,
+        AMRMClientRelayerMetrics.getInstance()
+        .getRMMasterSlaveSwitchMetric(homeID).value());
+
+    // The success metric should be incremented once
+    Assert.assertEquals(++previousSuccess,
+        AMRMClientRelayerMetrics.getInstance()
+            .getHeartbeatSuccessMetric(homeID).value());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+    // Ask from the uam
+    this.uamRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+    // Update the any to ask for an extra container
+    this.asks.get(2).setNumContainers(3);
+    mockAMS.failover = true;
+    this.homeRelayer.allocate(getAllocateRequest());
+    // The failover metric should be incremented
+    Assert.assertEquals(++previousFailover,
+        AMRMClientRelayerMetrics.getInstance()
+            .getRMMasterSlaveSwitchMetric(homeID).value());
+
+    // The success metric should be incremented once
+    Assert.assertEquals(++previousSuccess,
+        AMRMClientRelayerMetrics.getInstance()
+            .getHeartbeatSuccessMetric(homeID).value());
+
+    Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+    // Update the any to ask to pretend a container was allocated
+    this.asks.get(2).setNumContainers(2);
+    mockAMS.failover = true;
+    this.homeRelayer.allocate(getAllocateRequest());
+    // The failover metric should be incremented
+    Assert.assertEquals(++previousFailover,
+        AMRMClientRelayerMetrics.getInstance()
+            .getRMMasterSlaveSwitchMetric(homeID).value());
+
+    // The success metric should be incremented once
+    Assert.assertEquals(++previousSuccess,
+        AMRMClientRelayerMetrics.getInstance()
+            .getHeartbeatSuccessMetric(homeID).value());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+    long previousFailure = AMRMClientRelayerMetrics.getInstance()
+        .getHeartbeatFailureMetric(homeID).value();
+
+    mockAMS.exception = true;
+    try{
+      this.homeRelayer.allocate(getAllocateRequest());
+      Assert.fail();
+    } catch (YarnException e){
+    }
+    // The failover metric should not be incremented
+    Assert.assertEquals(previousFailover,
+        AMRMClientRelayerMetrics.getInstance()
+            .getRMMasterSlaveSwitchMetric(homeID).value());
+
+    // The success metric should not be incremented
+    Assert.assertEquals(previousSuccess,
+        AMRMClientRelayerMetrics.getInstance()
+            .getHeartbeatSuccessMetric(homeID).value());
+
+    // The failure metric should be incremented
+    Assert.assertEquals(++previousFailure,
+        AMRMClientRelayerMetrics.getInstance()
+            .getHeartbeatFailureMetric(homeID).value());
+
+    mockAMS.failover = true;
+    mockAMS.exception = true;
+    try{
+      this.homeRelayer.allocate(getAllocateRequest());
+      Assert.fail();
+    } catch (YarnException e){
+    }
+    // The failover metric should be incremented
+    Assert.assertEquals(++previousFailover,
+        AMRMClientRelayerMetrics.getInstance()
+            .getRMMasterSlaveSwitchMetric(homeID).value());
+
+    // The success metric should not be incremented
+    Assert.assertEquals(previousSuccess,
+        AMRMClientRelayerMetrics.getInstance()
+            .getHeartbeatSuccessMetric(homeID).value());
+
+    // The failure metric should be incremented
+    Assert.assertEquals(++previousFailure,
+        AMRMClientRelayerMetrics.getInstance()
+            .getHeartbeatFailureMetric(homeID).value());
+  }
+
+  @Test
+  public void testNewEmptyRequest()
+      throws YarnException, IOException {
+    // Ask for zero containers
+    this.asks.add(createResourceRequest(1, ResourceRequest.ANY, 2048, 1, 1,
+        ExecutionType.GUARANTEED, 0));
+    this.homeRelayer.allocate(getAllocateRequest());
+
+    Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+    Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+        .getPendingMetric(uamID, RequestType.Guaranteed).value());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
index 5848d3f..b6bb0da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -374,7 +374,7 @@ public class TestUnmanagedApplicationManager {
         ApplicationId appId, String queueName, String submitter,
         String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
       super(conf, appId, queueName, submitter, appNameSuffix,
-          keepContainersAcrossApplicationAttempts);
+          keepContainersAcrossApplicationAttempts, "TEST");
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index eb818f1..1bf882f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -254,7 +254,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     this.homeSubClusterId =
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
     this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
-        ApplicationMasterProtocol.class, this.appOwner), appId);
+        ApplicationMasterProtocol.class, this.appOwner), appId,
+        this.homeSubClusterId.toString());
 
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -340,7 +341,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
               this.attemptId.getApplicationId(),
               this.amRegistrationResponse.getQueue(),
               getApplicationContext().getUser(), this.homeSubClusterId.getId(),
-              entry.getValue());
+              entry.getValue(), subClusterId.toString());
 
           this.secondaryRelayers.put(subClusterId.getId(),
               this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
@@ -666,7 +667,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                   uamPool.finishApplicationMaster(subClusterId, finishRequest);
 
               if (uamResponse.getIsUnregistered()) {
-                secondaryRelayers.remove(subClusterId);
+                AMRMClientRelayer relayer =
+                    secondaryRelayers.remove(subClusterId);
+                if(relayer != null) {
+                  relayer.shutdown();
+                }
 
                 if (getNMStateStore() != null) {
                   getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
@@ -753,6 +758,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
       this.threadpool = null;
     }
+    homeRMRelayer.shutdown();
+    for(AMRMClientRelayer relayer : secondaryRelayers.values()){
+      relayer.shutdown();
+    }
     super.shutdown();
   }
 
@@ -885,7 +894,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                 uamPool.reAttachUAM(subClusterId.getId(), config, appId,
                     amRegistrationResponse.getQueue(),
                     getApplicationContext().getUser(), homeSubClusterId.getId(),
-                    amrmToken);
+                    amrmToken, subClusterId.toString());
 
                 secondaryRelayers.put(subClusterId.getId(),
                     uamPool.getAMRMClientRelayer(subClusterId.getId()));
@@ -1136,7 +1145,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                   token = uamPool.launchUAM(subClusterId, config,
                       attemptId.getApplicationId(),
                       amRegistrationResponse.getQueue(), appContext.getUser(),
-                      homeSubClusterId.toString(), true);
+                      homeSubClusterId.toString(), true, subClusterId);
 
                   secondaryRelayers.put(subClusterId,
                       uamPool.getAMRMClientRelayer(subClusterId));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index 1088c69..33617d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -122,7 +122,8 @@ public class TestableFederationInterceptor extends FederationInterceptor {
     @Override
     public UnmanagedApplicationManager createUAM(Configuration conf,
         ApplicationId appId, String queueName, String submitter,
-        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
+        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
+        String rmId) {
       return new TestableUnmanagedApplicationManager(conf, appId, queueName,
           submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
     }
@@ -139,7 +140,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
         ApplicationId appId, String queueName, String submitter,
         String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
       super(conf, appId, queueName, submitter, appNameSuffix,
-          keepContainersAcrossApplicationAttempts);
+          keepContainersAcrossApplicationAttempts, "TEST");
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org