You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2018/09/12 18:47:08 UTC
hadoop git commit: YARN-8658. [AMRMProxy] Metrics for
AMRMClientRelayer inside FederationInterceptor. Contributed by Young Chen.
Repository: hadoop
Updated Branches:
refs/heads/trunk 64c7a12b5 -> 02b9bfdf9
YARN-8658. [AMRMProxy] Metrics for AMRMClientRelayer inside FederationInterceptor. Contributed by Young Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/02b9bfdf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/02b9bfdf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/02b9bfdf
Branch: refs/heads/trunk
Commit: 02b9bfdf9e4bd0b3c05ca5fd75399dedcb656e09
Parents: 64c7a12
Author: Giovanni Matteo Fumarola <gi...@apache.org>
Authored: Wed Sep 12 11:46:35 2018 -0700
Committer: Giovanni Matteo Fumarola <gi...@apache.org>
Committed: Wed Sep 12 11:46:35 2018 -0700
----------------------------------------------------------------------
.../hadoop/yarn/server/AMRMClientRelayer.java | 412 ++++++++++-----
.../metrics/AMRMClientRelayerMetrics.java | 368 +++++++++++++
.../yarn/server/metrics/package-info.java | 18 +
.../yarn/server/uam/UnmanagedAMPoolManager.java | 30 +-
.../server/uam/UnmanagedApplicationManager.java | 16 +-
.../yarn/server/TestAMRMClientRelayer.java | 2 +-
.../metrics/TestAMRMClientRelayerMetrics.java | 513 +++++++++++++++++++
.../uam/TestUnmanagedApplicationManager.java | 2 +-
.../amrmproxy/FederationInterceptor.java | 19 +-
.../TestableFederationInterceptor.java | 5 +-
10 files changed, 1242 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
index 1e2060c..2621d3e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
@@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics;
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
import org.slf4j.Logger;
@@ -98,6 +100,15 @@ public class AMRMClientRelayer extends AbstractService
private Set<ResourceRequest> ask =
new TreeSet<>(new ResourceRequest.ResourceRequestComparator());
+ /**
+ * Data structures for pending and allocate latency metrics. This only applies
+ * for requests with non-zero allocationRequestId.
+ */
+ private Map<Long, Integer> pendingCountForMetrics = new HashMap<>();
+ private Map<Long, Long> askTimeStamp = new HashMap<>();
+ // List of allocated containerId to avoid double counting
+ private Set<ContainerId> knownContainers = new HashSet<>();
+
private Set<ContainerId> remotePendingRelease = new HashSet<>();
private Set<ContainerId> release = new HashSet<>();
@@ -108,6 +119,7 @@ public class AMRMClientRelayer extends AbstractService
private Map<ContainerId, UpdateContainerRequest> remotePendingChange =
new HashMap<>();
private Map<ContainerId, UpdateContainerRequest> change = new HashMap<>();
+ private Map<ContainerId, Long> changeTimeStamp = new HashMap<>();
private Map<Set<String>, List<SchedulingRequest>> remotePendingSchedRequest =
new HashMap<>();
@@ -119,16 +131,26 @@ public class AMRMClientRelayer extends AbstractService
// heartbeat
private volatile int resetResponseId;
+ private String rmId = "";
+ private volatile boolean shutdown = false;
+
+ private AMRMClientRelayerMetrics metrics;
+
public AMRMClientRelayer() {
super(AMRMClientRelayer.class.getName());
this.resetResponseId = -1;
+ this.metrics = AMRMClientRelayerMetrics.getInstance();
+ this.rmClient = null;
+ this.appId = null;
+ this.rmId = "";
}
public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
- ApplicationId appId) {
+ ApplicationId appId, String rmId) {
this();
this.rmClient = rmClient;
this.appId = appId;
+ this.rmId = rmId;
}
@Override
@@ -155,6 +177,7 @@ public class AMRMClientRelayer extends AbstractService
if (this.rmClient != null) {
RPC.stopProxy(this.rmClient);
}
+ shutdown();
super.serviceStop();
}
@@ -163,6 +186,49 @@ public class AMRMClientRelayer extends AbstractService
this.amRegistrationRequest = registerRequest;
}
+ public void setRMClient(ApplicationMasterProtocol client){
+ this.rmClient = client;
+ }
+
+ public void shutdown() {
+ // On finish, clear out our pending count from the metrics
+ // and set the shut down flag so no more pending requests get
+ // added
+ synchronized (this) {
+ if (this.shutdown) {
+ LOG.warn(
+ "Shutdown called twice for AMRMClientRelayer for RM " + this.rmId);
+ return;
+ }
+ this.shutdown = true;
+ for (Map.Entry<ResourceRequestSetKey, ResourceRequestSet> entry
+ : this.remotePendingAsks .entrySet()) {
+ ResourceRequestSetKey key = entry.getKey();
+ if (key.getAllocationRequestId() == 0) {
+ this.metrics.decrClientPending(this.rmId,
+ AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
+ entry.getValue().getNumContainers());
+ } else {
+ this.askTimeStamp.remove(key.getAllocationRequestId());
+ Integer pending =
+ this.pendingCountForMetrics.remove(key.getAllocationRequestId());
+ if (pending == null) {
+ throw new YarnRuntimeException(
+ "pendingCountForMetrics not found for key " + key
+ + " during shutdown");
+ }
+ this.metrics.decrClientPending(this.rmId,
+ AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
+ pending);
+ }
+ }
+ for(UpdateContainerRequest req : remotePendingChange.values()) {
+ this.metrics
+ .decrClientPending(rmId, req.getContainerUpdateType(), 1);
+ }
+ }
+ }
+
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
@@ -178,7 +244,8 @@ public class AMRMClientRelayer extends AbstractService
try {
return this.rmClient.finishApplicationMaster(request);
} catch (ApplicationMasterNotRegisteredException e) {
- LOG.warn("Out of sync with RM for " + this.appId + ", hence resyncing.");
+ LOG.warn("Out of sync with RM " + rmId
+ + " for " + this.appId + ", hence resyncing.");
// re register with RM
registerApplicationMaster(this.amRegistrationRequest);
return finishApplicationMaster(request);
@@ -215,7 +282,23 @@ public class AMRMClientRelayer extends AbstractService
if (allocateRequest.getUpdateRequests() != null) {
for (UpdateContainerRequest update : allocateRequest
.getUpdateRequests()) {
- this.remotePendingChange.put(update.getContainerId(), update);
+ UpdateContainerRequest req =
+ this.remotePendingChange.put(update.getContainerId(), update);
+ this.changeTimeStamp
+ .put(update.getContainerId(), System.currentTimeMillis());
+ if (req == null) {
+ // If this is a brand new request, all we have to do is increment
+ this.metrics
+ .incrClientPending(rmId, update.getContainerUpdateType(), 1);
+ } else if (req.getContainerUpdateType() != update
+ .getContainerUpdateType()) {
+ // If this is replacing a request with a different update type, we
+ // need to decrement the replaced type
+ this.metrics
+ .decrClientPending(rmId, req.getContainerUpdateType(), 1);
+ this.metrics
+ .incrClientPending(rmId, update.getContainerUpdateType(), 1);
+ }
this.change.put(update.getContainerId(), update);
}
}
@@ -232,141 +315,196 @@ public class AMRMClientRelayer extends AbstractService
public AllocateResponse allocate(AllocateRequest allocateRequest)
throws YarnException, IOException {
AllocateResponse allocateResponse = null;
- try {
- synchronized (this) {
- addNewAllocateRequest(allocateRequest);
+ long startTime = System.currentTimeMillis();
+ synchronized (this) {
+ if(this.shutdown){
+ throw new YarnException("Allocate called after AMRMClientRelayer for "
+ + "RM " + rmId + " shutdown.");
+ }
+ addNewAllocateRequest(allocateRequest);
- ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
- for (ResourceRequest r : ask) {
- // create a copy of ResourceRequest as we might change it while the
- // RPC layer is using it to send info across
- askList.add(ResourceRequest.clone(r));
- }
+ ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
+ for (ResourceRequest r : ask) {
+ // create a copy of ResourceRequest as we might change it while the
+ // RPC layer is using it to send info across
+ askList.add(ResourceRequest.clone(r));
+ }
- allocateRequest = AllocateRequest.newBuilder()
- .responseId(allocateRequest.getResponseId())
- .progress(allocateRequest.getProgress()).askList(askList)
- .releaseList(new ArrayList<>(this.release))
- .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
- new ArrayList<>(this.blacklistAdditions),
- new ArrayList<>(this.blacklistRemovals)))
- .updateRequests(new ArrayList<>(this.change.values()))
- .schedulingRequests(new ArrayList<>(this.schedulingRequest))
- .build();
-
- if (this.resetResponseId != -1) {
- LOG.info("Override allocate responseId from "
- + allocateRequest.getResponseId() + " to " + this.resetResponseId
- + " for " + this.appId);
- allocateRequest.setResponseId(this.resetResponseId);
- }
+ allocateRequest = AllocateRequest.newBuilder()
+ .responseId(allocateRequest.getResponseId())
+ .progress(allocateRequest.getProgress()).askList(askList)
+ .releaseList(new ArrayList<>(this.release))
+ .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(this.blacklistAdditions),
+ new ArrayList<>(this.blacklistRemovals)))
+ .updateRequests(new ArrayList<>(this.change.values()))
+ .schedulingRequests(new ArrayList<>(this.schedulingRequest))
+ .build();
+
+ if (this.resetResponseId != -1) {
+ LOG.info("Override allocate responseId from "
+ + allocateRequest.getResponseId() + " to " + this.resetResponseId
+ + " for " + this.appId);
+ allocateRequest.setResponseId(this.resetResponseId);
}
+ }
- // Do the actual allocate call
- try {
- allocateResponse = this.rmClient.allocate(allocateRequest);
-
- // Heartbeat succeeded, wipe out responseId overriding
- this.resetResponseId = -1;
- } catch (ApplicationMasterNotRegisteredException e) {
- LOG.warn("ApplicationMaster is out of sync with RM for " + this.appId
- + " hence resyncing.");
-
- synchronized (this) {
- // Add all remotePending data into to-send data structures
- for (ResourceRequestSet requestSet : this.remotePendingAsks
- .values()) {
- for (ResourceRequest rr : requestSet.getRRs()) {
- addResourceRequestToAsk(rr);
- }
- }
- this.release.addAll(this.remotePendingRelease);
- this.blacklistAdditions.addAll(this.remoteBlacklistedNodes);
- this.change.putAll(this.remotePendingChange);
- for (List<SchedulingRequest> reqs : this.remotePendingSchedRequest
- .values()) {
- this.schedulingRequest.addAll(reqs);
+ // Do the actual allocate call
+ try {
+ allocateResponse = this.rmClient.allocate(allocateRequest);
+
+ // Heartbeat succeeded, wipe out responseId overriding
+ this.resetResponseId = -1;
+ } catch (ApplicationMasterNotRegisteredException e) {
+ // This is a retriable exception - we will re register and mke a
+ // recursive call to retry
+ LOG.warn("ApplicationMaster is out of sync with RM " + rmId
+ + " for " + this.appId + ", hence resyncing.");
+
+ this.metrics.incrRMMasterSlaveSwitch(this.rmId);
+
+ synchronized (this) {
+ // Add all remotePending data into to-send data structures
+ for (ResourceRequestSet requestSet : this.remotePendingAsks
+ .values()) {
+ for (ResourceRequest rr : requestSet.getRRs()) {
+ addResourceRequestToAsk(rr);
}
}
+ this.release.addAll(this.remotePendingRelease);
+ this.blacklistAdditions.addAll(this.remoteBlacklistedNodes);
+ this.change.putAll(this.remotePendingChange);
+ for (List<SchedulingRequest> reqs : this.remotePendingSchedRequest
+ .values()) {
+ this.schedulingRequest.addAll(reqs);
+ }
+ }
- // re-register with RM, then retry allocate recursively
- registerApplicationMaster(this.amRegistrationRequest);
- // Reset responseId after re-register
- allocateRequest.setResponseId(0);
- return allocate(allocateRequest);
- } catch (Throwable t) {
-
- // If RM is complaining about responseId out of sync, force reset next
- // time
- if (t instanceof InvalidApplicationMasterRequestException) {
- int responseId = AMRMClientUtils
- .parseExpectedResponseIdFromException(t.getMessage());
- if (responseId != -1) {
- this.resetResponseId = responseId;
- LOG.info("ResponseId out of sync with RM, expect " + responseId
- + " but " + allocateRequest.getResponseId() + " used by "
- + this.appId + ". Will override in the next allocate.");
- } else {
- LOG.warn("Failed to parse expected responseId out of exception for "
- + this.appId);
- }
+ // re-register with RM, then retry allocate recursively
+ registerApplicationMaster(this.amRegistrationRequest);
+ // Reset responseId after re-register
+ allocateRequest.setResponseId(0);
+ allocateResponse = allocate(allocateRequest);
+ return allocateResponse;
+ } catch (Throwable t) {
+ // Unexpected exception - rethrow and increment heart beat failure metric
+ this.metrics.addHeartbeatFailure(this.rmId,
+ System.currentTimeMillis() - startTime);
+
+ // If RM is complaining about responseId out of sync, force reset next
+ // time
+ if (t instanceof InvalidApplicationMasterRequestException) {
+ int responseId = AMRMClientUtils
+ .parseExpectedResponseIdFromException(t.getMessage());
+ if (responseId != -1) {
+ this.resetResponseId = responseId;
+ LOG.info("ResponseId out of sync with RM, expect " + responseId
+ + " but " + allocateRequest.getResponseId() + " used by "
+ + this.appId + ". Will override in the next allocate.");
+ } else {
+ LOG.warn("Failed to parse expected responseId out of exception for "
+ + this.appId);
}
+ }
+
+ throw t;
+ }
- throw t;
+ synchronized (this) {
+ if (this.shutdown) {
+ throw new YarnException("Allocate call succeeded for " + this.appId
+ + " after AMRMClientRelayer for RM " + rmId + " shutdown.");
}
- synchronized (this) {
- // Process the allocate response from RM
- if (allocateResponse.getCompletedContainersStatuses() != null) {
- for (ContainerStatus container : allocateResponse
- .getCompletedContainersStatuses()) {
- this.remotePendingRelease.remove(container.getContainerId());
- this.remotePendingChange.remove(container.getContainerId());
- }
- }
+ updateMetrics(allocateResponse, startTime);
- if (allocateResponse.getUpdatedContainers() != null) {
- for (UpdatedContainer updatedContainer : allocateResponse
- .getUpdatedContainers()) {
- this.remotePendingChange
- .remove(updatedContainer.getContainer().getId());
+ AMRMClientUtils.removeFromOutstandingSchedulingRequests(
+ allocateResponse.getAllocatedContainers(),
+ this.remotePendingSchedRequest);
+ AMRMClientUtils.removeFromOutstandingSchedulingRequests(
+ allocateResponse.getContainersFromPreviousAttempts(),
+ this.remotePendingSchedRequest);
+
+ this.ask.clear();
+ this.release.clear();
+
+ this.blacklistAdditions.clear();
+ this.blacklistRemovals.clear();
+
+ this.change.clear();
+ this.schedulingRequest.clear();
+ return allocateResponse;
+ }
+ }
+
+ private void updateMetrics(AllocateResponse allocateResponse,
+ long startTime) {
+ this.metrics.addHeartbeatSuccess(this.rmId,
+ System.currentTimeMillis() - startTime);
+ // Process the allocate response from RM
+ if (allocateResponse.getAllocatedContainers() != null) {
+ for (Container container : allocateResponse
+ .getAllocatedContainers()) {
+ // Do not update metrics aggressively for AllocationRequestId zero
+ // case. Also avoid double count to due to re-send
+ if (this.knownContainers.add(container.getId())) {
+ this.metrics.addFulfilledQPS(this.rmId, AMRMClientRelayerMetrics
+ .getRequestType(container.getExecutionType()), 1);
+ if (container.getAllocationRequestId() != 0) {
+ Integer count = this.pendingCountForMetrics
+ .get(container.getAllocationRequestId());
+ if (count != null && count > 0) {
+ this.pendingCountForMetrics
+ .put(container.getAllocationRequestId(), --count);
+ this.metrics.decrClientPending(this.rmId,
+ AMRMClientRelayerMetrics
+ .getRequestType(container.getExecutionType()), 1);
+ this.metrics.addFulfillLatency(this.rmId,
+ AMRMClientRelayerMetrics
+ .getRequestType(container.getExecutionType()),
+ System.currentTimeMillis() - this.askTimeStamp
+ .get(container.getAllocationRequestId()));
+ }
}
}
-
- AMRMClientUtils.removeFromOutstandingSchedulingRequests(
- allocateResponse.getAllocatedContainers(),
- this.remotePendingSchedRequest);
- AMRMClientUtils.removeFromOutstandingSchedulingRequests(
- allocateResponse.getContainersFromPreviousAttempts(),
- this.remotePendingSchedRequest);
}
+ }
+ if (allocateResponse.getCompletedContainersStatuses() != null) {
+ for (ContainerStatus container : allocateResponse
+ .getCompletedContainersStatuses()) {
+ this.remotePendingRelease.remove(container.getContainerId());
+ UpdateContainerRequest req =
+ this.remotePendingChange.remove(container.getContainerId());
+ if (req != null) {
+ this.metrics
+ .decrClientPending(rmId, req.getContainerUpdateType(), 1);
+ }
+ this.knownContainers.remove(container.getContainerId());
+ }
+ }
- } finally {
- synchronized (this) {
- /*
- * If allocateResponse is null, it means exception happened and RM did
- * not accept the request. Don't clear any data structures so that they
- * will be re-sent next time.
- *
- * Otherwise request was accepted by RM, we are safe to clear these.
- */
- if (allocateResponse != null) {
- this.ask.clear();
- this.release.clear();
-
- this.blacklistAdditions.clear();
- this.blacklistRemovals.clear();
-
- this.change.clear();
- this.schedulingRequest.clear();
+ if (allocateResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocateResponse
+ .getUpdatedContainers()) {
+ UpdateContainerRequest req = this.remotePendingChange
+ .remove(updatedContainer.getContainer().getId());
+ if (req != null) {
+ this.metrics
+ .decrClientPending(rmId, req.getContainerUpdateType(), 1);
+ this.metrics.addFulfillLatency(rmId, req.getContainerUpdateType(),
+ System.currentTimeMillis() - this.changeTimeStamp
+ .remove(req.getContainerId()));
+ this.metrics
+ .addFulfilledQPS(rmId, req.getContainerUpdateType(), 1);
}
}
}
- return allocateResponse;
+
}
private void addNewAsks(List<ResourceRequest> asks) throws YarnException {
Set<ResourceRequestSetKey> touchedKeys = new HashSet<>();
+ Set<ResourceRequestSetKey> nonZeroNewKeys = new HashSet<>();
for (ResourceRequest rr : asks) {
addResourceRequestToAsk(rr);
@@ -377,8 +515,38 @@ public class AMRMClientRelayer extends AbstractService
if (askSet == null) {
askSet = new ResourceRequestSet(key);
this.remotePendingAsks.put(key, askSet);
+ if (key.getAllocationRequestId() != 0) {
+ nonZeroNewKeys.add(key);
+ }
}
+
+ int numContainers = askSet.getNumContainers();
askSet.addAndOverrideRR(rr);
+ int deltaContainers = askSet.getNumContainers() - numContainers;
+
+ if (key.getAllocationRequestId() == 0) {
+ // AllocationRequestId is zero, keep track of pending count in the
+ // delayed but correct way. Allocation latency is not supported
+ if (deltaContainers != 0) {
+ this.metrics.incrClientPending(this.rmId,
+ AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
+ deltaContainers);
+ if(deltaContainers > 0){
+ this.metrics.addRequestedQPS(this.rmId,
+ AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
+ deltaContainers);
+ }
+ }
+ } else {
+ // AllocationRequestId is non-zero, we do pending decrement and latency
+ // aggressively. So don't update metrics here. Double check AM is not
+ // reusing the requestId for more asks
+ if (deltaContainers > 0 && numContainers != 0) {
+ throw new YarnException("Received new ask ("
+ + askSet.getNumContainers() + ") on top of existing ("
+ + numContainers + ") in key " + key);
+ }
+ }
}
// Cleanup properly if needed
@@ -391,6 +559,20 @@ public class AMRMClientRelayer extends AbstractService
askSet.cleanupZeroNonAnyRR();
}
}
+
+ // Initialize data for pending metrics for each new key
+ for (ResourceRequestSetKey key : nonZeroNewKeys) {
+ if(remotePendingAsks.containsKey(key)){
+ this.askTimeStamp.put(key.getAllocationRequestId(),
+ System.currentTimeMillis());
+ int count = this.remotePendingAsks.get(key).getNumContainers();
+ this.pendingCountForMetrics.put(key.getAllocationRequestId(), count);
+ this.metrics.incrClientPending(this.rmId,
+ AMRMClientRelayerMetrics.getRequestType(key.getExeType()), count);
+ this.metrics.addRequestedQPS(this.rmId,
+ AMRMClientRelayerMetrics.getRequestType(key.getExeType()), count);
+ }
+ }
}
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java
new file mode 100644
index 0000000..6ce5851
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * Metrics for FederationInterceptor Internals.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "Performance and usage metrics for YARN AMRMClientRelayer",
+ context = "fedr")
+public final class AMRMClientRelayerMetrics implements MetricsSource{
+
+ /**
+ * Easier classification of request types for logging metrics.
+ */
+ public enum RequestType {
+ Guaranteed, Opportunistic, Promote, Demote;
+
+ @Override
+ public String toString() {
+ switch (this) {
+ case Guaranteed:
+ return "G";
+ case Opportunistic:
+ return "O";
+ case Promote:
+ return "P";
+ case Demote:
+ return "D";
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ }
+
+ private static AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+ private static final MetricsInfo RECORD_INFO =
+ info("AMRMClientRelayerMetrics",
+ "Metrics for the Yarn AMRMClientRelayer");
+
+ private static volatile AMRMClientRelayerMetrics instance = null;
+ private static MetricsRegistry registry;
+
+ // The metrics are set up as a map from string (typically sub cluster id) to
+ // request type (Guaranteed, Opp, Promote, Demote) to the counter.
+ // The counters are constructed lazily when the first metric entry
+ // comes in.
+ // For some metrics, request type is not applicable.
+ private final Map<String, Map<RequestType, MutableGaugeLong>>
+ rmClientPending = new ConcurrentHashMap<>();
+
+ private final Map<String, Map<RequestType, MutableQuantiles>> fulfillLatency =
+ new ConcurrentHashMap<>();
+
+ private final Map<String, Map<RequestType, MutableGaugeLong>>
+ requestedQps = new ConcurrentHashMap<>();
+
+ private final Map<String, Map<RequestType, MutableGaugeLong>>
+ fulfilledQps = new ConcurrentHashMap<>();
+
+ private final Map<String, MutableGaugeLong> rmMasterSlaveSwitch =
+ new ConcurrentHashMap<>();
+
+ private final Map<String, MutableGaugeLong> heartbeatFailure =
+ new ConcurrentHashMap<>();
+
+ private final Map<String, MutableGaugeLong> heartbeatSuccess =
+ new ConcurrentHashMap<>();
+ private final Map<String, MutableQuantiles> heartbeatLatency =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Initialize the singleton instance.
+ *
+ * @return the singleton
+ */
+ public static AMRMClientRelayerMetrics getInstance() {
+ if (!isInitialized.get()) {
+ synchronized (AMRMClientRelayerMetrics.class) {
+ if (instance == null) {
+ instance = new AMRMClientRelayerMetrics();
+ DefaultMetricsSystem.instance().register(RECORD_INFO.name(),
+ RECORD_INFO.description(), instance);
+ isInitialized.set(true);
+ }
+ }
+ }
+ return instance;
+ }
+
+ private AMRMClientRelayerMetrics() {
+ registry = new MetricsRegistry(RECORD_INFO);
+ registry.tag(RECORD_INFO, "AMRMClientRelayer");
+ }
+
+ public static RequestType getRequestType(ExecutionType execType) {
+ if (execType == null || execType.equals(ExecutionType.GUARANTEED)) {
+ return RequestType.Guaranteed;
+ }
+ return RequestType.Opportunistic;
+ }
+
+ @VisibleForTesting
+ protected MutableGaugeLong getPendingMetric(String instanceId,
+ RequestType type) {
+ synchronized (rmClientPending) {
+ if (!rmClientPending.containsKey(instanceId)) {
+ rmClientPending.put(instanceId,
+ new ConcurrentHashMap<RequestType, MutableGaugeLong>());
+ }
+ if (!rmClientPending.get(instanceId).containsKey(type)) {
+ rmClientPending.get(instanceId).put(type, registry
+ .newGauge(type.toString() + "Pending" + instanceId,
+ "Remove pending of " + type + " for " + instanceId, 0L));
+ }
+ }
+ return rmClientPending.get(instanceId).get(type);
+ }
+
+ public void incrClientPending(String instanceId, RequestType type, int diff) {
+ getPendingMetric(instanceId, type).incr(diff);
+ }
+
+ public void decrClientPending(String instanceId, RequestType type, int diff) {
+ getPendingMetric(instanceId, type).decr(diff);
+ }
+
+ @VisibleForTesting
+ protected void setClientPending(String instanceId, RequestType type,
+ int val) {
+ getPendingMetric(instanceId, type).set(val);
+ }
+
+ @VisibleForTesting
+ protected MutableQuantiles getFulfillLatencyMetric(String instanceId,
+ RequestType type) {
+ synchronized (fulfillLatency) {
+ if (!fulfillLatency.containsKey(instanceId)) {
+ fulfillLatency.put(instanceId,
+ new ConcurrentHashMap<RequestType, MutableQuantiles>());
+ }
+ if (!fulfillLatency.get(instanceId).containsKey(type)) {
+ fulfillLatency.get(instanceId).put(type, registry
+ .newQuantiles(type.toString() + "FulfillLatency" + instanceId,
+ "FulfillLatency of " + type + " for " + instanceId, "ops",
+ "latency", 60));
+ }
+ }
+ return fulfillLatency.get(instanceId).get(type);
+ }
+
+ public void addFulfillLatency(String instanceId, RequestType type,
+ long latency) {
+ getFulfillLatencyMetric(instanceId, type).add(latency);
+ }
+
+ public void addFulfillLatency(String instanceId, ContainerUpdateType type,
+ long latency) {
+ switch(type) {
+ case DEMOTE_EXECUTION_TYPE:
+ addFulfillLatency(instanceId, RequestType.Demote, latency);
+ break;
+ case PROMOTE_EXECUTION_TYPE:
+ addFulfillLatency(instanceId, RequestType.Promote, latency);
+ break;
+ default:
+ break;
+ }
+ }
+
+ @VisibleForTesting
+ protected MutableGaugeLong getRequestedQPSMetric(String instanceId,
+ RequestType type) {
+ synchronized (requestedQps) {
+ if (!requestedQps.containsKey(instanceId)) {
+ requestedQps.put(instanceId,
+ new ConcurrentHashMap<RequestType, MutableGaugeLong>());
+ }
+ if (!requestedQps.get(instanceId).containsKey(type)) {
+ requestedQps.get(instanceId)
+ .put(type, registry.newGauge(
+ info(type.toString() + "RequestedOps" + instanceId,
+ "Requested operations of " + type + " for " + instanceId),
+ 0L));
+ }
+ }
+ return requestedQps.get(instanceId).get(type);
+ }
+
+ public void addRequestedQPS(String instanceId, RequestType type,
+ long numEntries) {
+ getRequestedQPSMetric(instanceId, type).incr(numEntries);
+ }
+
+ @VisibleForTesting
+ protected MutableGaugeLong getFulfilledQPSMetric(String instanceId,
+ RequestType type) {
+ synchronized (fulfilledQps) {
+ if (!fulfilledQps.containsKey(instanceId)) {
+ fulfilledQps.put(instanceId,
+ new ConcurrentHashMap<RequestType, MutableGaugeLong>());
+ }
+ if (!fulfilledQps.get(instanceId).containsKey(type)) {
+ fulfilledQps.get(instanceId)
+ .put(type, registry.newGauge(
+ info(type.toString() + "FulfilledOps" + instanceId,
+ "Fulfilled operations of " + type + " for " + instanceId),
+ 0L));
+ }
+ }
+ return fulfilledQps.get(instanceId).get(type);
+ }
+
+ public void addFulfilledQPS(String instanceId, RequestType type,
+ long numEntries) {
+ getFulfilledQPSMetric(instanceId, type).incr(numEntries);
+ }
+
+ public void addFulfilledQPS(String instanceId, ContainerUpdateType type,
+ long latency) {
+ switch(type) {
+ case DEMOTE_EXECUTION_TYPE:
+ addFulfilledQPS(instanceId, RequestType.Demote, latency);
+ break;
+ case PROMOTE_EXECUTION_TYPE:
+ addFulfilledQPS(instanceId, RequestType.Promote, latency);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public void incrClientPending(String scId, ContainerUpdateType type,
+ int diff) {
+ switch(type) {
+ case DEMOTE_EXECUTION_TYPE:
+ incrClientPending(scId, RequestType.Demote, diff);
+ break;
+ case PROMOTE_EXECUTION_TYPE:
+ incrClientPending(scId, RequestType.Promote, diff);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public void decrClientPending(String scId, ContainerUpdateType type,
+ int diff) {
+ switch(type) {
+ case DEMOTE_EXECUTION_TYPE:
+ decrClientPending(scId, RequestType.Demote, diff);
+ break;
+ case PROMOTE_EXECUTION_TYPE:
+ decrClientPending(scId, RequestType.Promote, diff);
+ break;
+ default:
+ break;
+ }
+ }
+
+ @VisibleForTesting
+ protected MutableGaugeLong getRMMasterSlaveSwitchMetric(
+ String instanceId) {
+ synchronized (rmMasterSlaveSwitch) {
+ if (!rmMasterSlaveSwitch.containsKey(instanceId)) {
+ rmMasterSlaveSwitch.put(instanceId, registry.newGauge(
+ info("RMMasterSlaveSwitch" + instanceId,
+ "Number of RM master slave switch"), 0L));
+ }
+ }
+ return rmMasterSlaveSwitch.get(instanceId);
+ }
+
+ public void incrRMMasterSlaveSwitch(String instanceId) {
+ getRMMasterSlaveSwitchMetric(instanceId).incr();
+ }
+
+ @VisibleForTesting
+ protected MutableQuantiles getHeartbeatLatencyMetric(String instanceId) {
+ synchronized (heartbeatLatency) {
+ if (!heartbeatLatency.containsKey(instanceId)) {
+ heartbeatLatency.put(instanceId, registry
+ .newQuantiles("HeartbeatLatency" + instanceId,
+ "HeartbeatLatency for " + instanceId, "ops", "latency", 60));
+ }
+ }
+ return heartbeatLatency.get(instanceId);
+ }
+
+ @VisibleForTesting
+ protected MutableGaugeLong getHeartbeatFailureMetric(
+ String instanceId) {
+ synchronized (heartbeatFailure) {
+ if (!heartbeatFailure.containsKey(instanceId)) {
+ heartbeatFailure.put(instanceId, registry.newGauge(
+ info("HeartbeatFailure" + instanceId,
+ "Number of Heartbeat Failures"), 0L));
+ }
+ }
+ return heartbeatFailure.get(instanceId);
+ }
+
+ public void addHeartbeatFailure(String instanceId, long latency) {
+ getHeartbeatFailureMetric(instanceId).incr();
+
+ getHeartbeatLatencyMetric(instanceId).add(latency);
+ }
+
+ @VisibleForTesting
+ protected MutableGaugeLong getHeartbeatSuccessMetric(
+ String instanceId) {
+ synchronized (heartbeatSuccess) {
+ if (!heartbeatSuccess.containsKey(instanceId)) {
+ heartbeatSuccess.put(instanceId, registry.newGauge(
+ info("HeartbeatSuccess" + instanceId,
+ "Number of Heartbeat"), 0L));
+ }
+ }
+ return heartbeatSuccess.get(instanceId);
+ }
+
+ public void addHeartbeatSuccess(String instanceId, long latency) {
+ getHeartbeatSuccessMetric(instanceId).incr();
+
+ getHeartbeatLatencyMetric(instanceId).add(latency);
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector builder, boolean all) {
+ registry.snapshot(builder.addRecord(registry.info().name()), all);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java
new file mode 100644
index 0000000..a67cf93
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.metrics;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 5f9d81b..d708ced 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -150,6 +150,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
* @param appNameSuffix application name suffix for the UAM
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* recovery.
+ * @param rmName name of the YarnRM
* @see ApplicationSubmissionContext
* #setKeepContainersAcrossApplicationAttempts(boolean)
* @return uamId for the UAM
@@ -159,7 +160,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
public String createAndRegisterNewUAM(
RegisterApplicationMasterRequest registerRequest, Configuration conf,
String queueName, String submitter, String appNameSuffix,
- boolean keepContainersAcrossApplicationAttempts)
+ boolean keepContainersAcrossApplicationAttempts, String rmName)
throws YarnException, IOException {
ApplicationId appId = null;
ApplicationClientProtocol rmClient;
@@ -183,7 +184,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
// Launch the UAM in RM
launchUAM(appId.toString(), conf, appId, queueName, submitter,
- appNameSuffix, keepContainersAcrossApplicationAttempts);
+ appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
// Register the UAM application
registerApplicationMaster(appId.toString(), registerRequest);
@@ -203,6 +204,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
* @param appNameSuffix application name suffix for the UAM
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* recovery.
+ * @param rmName name of the YarnRM
* @see ApplicationSubmissionContext
* #setKeepContainersAcrossApplicationAttempts(boolean)
* @return UAM token
@@ -211,14 +213,15 @@ public class UnmanagedAMPoolManager extends AbstractService {
*/
public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf,
ApplicationId appId, String queueName, String submitter,
- String appNameSuffix, boolean keepContainersAcrossApplicationAttempts)
- throws YarnException, IOException {
+ String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
+ String rmName) throws YarnException, IOException {
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " already exists");
}
UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
- submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
+ submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
+ rmName);
// Put the UAM into map first before initializing it to avoid additional UAM
// for the same uamId being created concurrently
this.unmanagedAppMasterMap.put(uamId, uam);
@@ -248,19 +251,20 @@ public class UnmanagedAMPoolManager extends AbstractService {
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
* @param uamToken UAM token
+ * @param rmName name of the YarnRM
* @throws YarnException if fails
* @throws IOException if fails
*/
- public void reAttachUAM(String uamId, Configuration conf,
- ApplicationId appId, String queueName, String submitter,
- String appNameSuffix, Token<AMRMTokenIdentifier> uamToken)
+ public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId,
+ String queueName, String submitter, String appNameSuffix,
+ Token<AMRMTokenIdentifier> uamToken, String rmName)
throws YarnException, IOException {
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " already exists");
}
- UnmanagedApplicationManager uam =
- createUAM(conf, appId, queueName, submitter, appNameSuffix, true);
+ UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
+ submitter, appNameSuffix, true, rmName);
// Put the UAM into map first before initializing it to avoid additional UAM
// for the same uamId being created concurrently
this.unmanagedAppMasterMap.put(uamId, uam);
@@ -287,14 +291,16 @@ public class UnmanagedAMPoolManager extends AbstractService {
* @param submitter submitter name of the application
* @param appNameSuffix application name suffix
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
+ * @param rmName name of the YarnRM
* @return the UAM instance
*/
@VisibleForTesting
protected UnmanagedApplicationManager createUAM(Configuration conf,
ApplicationId appId, String queueName, String submitter,
- String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
+ String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
+ String rmName) {
return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
- appNameSuffix, keepContainersAcrossApplicationAttempts);
+ appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 78dcfb6..7c1e154 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -116,13 +116,14 @@ public class UnmanagedApplicationManager {
* @param queueName the queue of the UAM
* @param submitter user name of the app
* @param appNameSuffix the app name suffix to use
+ * @param rmName name of the YarnRM
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* recovery. See {@link ApplicationSubmissionContext
* #setKeepContainersAcrossApplicationAttempts(boolean)}
*/
public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
String queueName, String submitter, String appNameSuffix,
- boolean keepContainersAcrossApplicationAttempts) {
+ boolean keepContainersAcrossApplicationAttempts, String rmName) {
Preconditions.checkNotNull(conf, "Configuration cannot be null");
Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
Preconditions.checkNotNull(submitter, "App submitter cannot be null");
@@ -132,9 +133,11 @@ public class UnmanagedApplicationManager {
this.queueName = queueName;
this.submitter = submitter;
this.appNameSuffix = appNameSuffix;
+ this.userUgi = null;
this.heartbeatHandler =
new AMHeartbeatRequestHandler(this.conf, this.applicationId);
- this.rmProxyRelayer = null;
+ this.rmProxyRelayer =
+ new AMRMClientRelayer(null, this.applicationId, rmName);
this.connectionInitiated = false;
this.registerRequest = null;
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
@@ -186,9 +189,8 @@ public class UnmanagedApplicationManager {
throws IOException {
this.userUgi = UserGroupInformation.createProxyUser(
this.applicationId.toString(), UserGroupInformation.getCurrentUser());
- this.rmProxyRelayer =
- new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
- this.conf, this.userUgi, amrmToken), this.applicationId);
+ this.rmProxyRelayer.setRMClient(createRMProxy(
+ ApplicationMasterProtocol.class, this.conf, this.userUgi, amrmToken));
this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer);
this.heartbeatHandler.setUGI(this.userUgi);
@@ -245,7 +247,7 @@ public class UnmanagedApplicationManager {
this.heartbeatHandler.shutdown();
- if (this.rmProxyRelayer == null) {
+ if (this.userUgi == null) {
if (this.connectionInitiated) {
// This is possible if the async launchUAM is still
// blocked and retrying. Return a dummy response in this case.
@@ -299,7 +301,7 @@ public class UnmanagedApplicationManager {
//
// In case 2, we have already save the allocate request above, so if the
// registration succeed later, no request is lost.
- if (this.rmProxyRelayer == null) {
+ if (this.userUgi == null) {
if (this.connectionInitiated) {
LOG.info("Unmanaged AM still not successfully launched/registered yet."
+ " Saving the allocate request and send later.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
index 4c84f0b..2c016d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
@@ -140,7 +140,7 @@ public class TestAMRMClientRelayer {
this.conf = new Configuration();
this.mockAMS = new MockApplicationMasterService();
- this.relayer = new AMRMClientRelayer(this.mockAMS, null);
+ this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST");
this.relayer.init(conf);
this.relayer.start();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java
new file mode 100644
index 0000000..ebbfae2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java
@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
+import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics.RequestType;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Unit test for AMRMClientRelayer.
+ */
+public class TestAMRMClientRelayerMetrics {
+
+ /**
+ * Mock AMS for easier testing and mocking of request/responses.
+ */
+ public static class MockApplicationMasterService
+ implements ApplicationMasterProtocol {
+
+ private boolean failover = false;
+ private boolean exception = false;
+ private List<ResourceRequest> lastAsk;
+ private List<ContainerId> lastRelease;
+ private List<UpdateContainerRequest> lastUpdates;
+ private List<String> lastBlacklistAdditions;
+ private List<String> lastBlacklistRemovals;
+ private AllocateResponse response = AllocateResponse
+ .newInstance(0, null, null, new ArrayList<NodeReport>(),
+ Resource.newInstance(0, 0), null, 0, null, null);
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnException, IOException {
+ if (this.failover) {
+ this.failover = false;
+ throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
+ }
+ return null;
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ if (this.failover) {
+ this.failover = false;
+ throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
+ }
+ if(this.exception){
+ this.exception = false;
+ throw new YarnException("Mock RM encountered exception");
+ }
+ this.lastAsk = request.getAskList();
+ this.lastRelease = request.getReleaseList();
+ this.lastUpdates = request.getUpdateRequests();
+ this.lastBlacklistAdditions =
+ request.getResourceBlacklistRequest().getBlacklistAdditions();
+ this.lastBlacklistRemovals =
+ request.getResourceBlacklistRequest().getBlacklistRemovals();
+ return response;
+ }
+
+ public void setFailoverFlag() {
+ this.failover = true;
+ }
+ }
+
+ private Configuration conf;
+ private MockApplicationMasterService mockAMS;
+ private String homeID = "home";
+ private AMRMClientRelayer homeRelayer;
+ private String uamID = "uam";
+ private AMRMClientRelayer uamRelayer;
+
+ private List<ResourceRequest> asks = new ArrayList<>();
+ private List<ContainerId> releases = new ArrayList<>();
+ private List<UpdateContainerRequest> updates = new ArrayList<>();
+ private List<String> blacklistAdditions = new ArrayList<>();
+ private List<String> blacklistRemoval = new ArrayList<>();
+
+ @Before
+ public void setup() throws YarnException, IOException {
+ this.conf = new Configuration();
+
+ this.mockAMS = new MockApplicationMasterService();
+
+ this.homeRelayer = new AMRMClientRelayer(this.mockAMS,
+ ApplicationId.newInstance(0, 0), this.homeID);
+ this.homeRelayer.init(conf);
+ this.homeRelayer.start();
+
+ this.homeRelayer.registerApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance("", 0, ""));
+
+ this.uamRelayer = new AMRMClientRelayer(this.mockAMS,
+ ApplicationId.newInstance(0, 0), this.uamID);
+ this.uamRelayer.init(conf);
+ this.uamRelayer.start();
+
+ this.uamRelayer.registerApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance("", 0, ""));
+
+ clearAllocateRequestLists();
+
+ AMRMClientRelayerMetrics.getInstance()
+ .setClientPending(homeID, RequestType.Guaranteed, 0);
+ AMRMClientRelayerMetrics.getInstance()
+ .setClientPending(homeID, RequestType.Opportunistic, 0);
+ AMRMClientRelayerMetrics.getInstance()
+ .setClientPending(homeID, RequestType.Promote, 0);
+ AMRMClientRelayerMetrics.getInstance()
+ .setClientPending(homeID, RequestType.Demote, 0);
+
+ AMRMClientRelayerMetrics.getInstance()
+ .setClientPending(uamID, RequestType.Guaranteed, 0);
+ AMRMClientRelayerMetrics.getInstance()
+ .setClientPending(uamID, RequestType.Opportunistic, 0);
+ AMRMClientRelayerMetrics.getInstance()
+ .setClientPending(uamID, RequestType.Promote, 0);
+ AMRMClientRelayerMetrics.getInstance()
+ .setClientPending(uamID, RequestType.Demote, 0);
+ }
+
+ private AllocateRequest getAllocateRequest() {
+ // Need to create a new one every time because rather than directly
+ // referring the lists, the protobuf impl makes a copy of the lists
+ return AllocateRequest.newBuilder()
+ .responseId(0)
+ .progress(0).askList(asks)
+ .releaseList(new ArrayList<>(this.releases))
+ .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(this.blacklistAdditions),
+ new ArrayList<>(this.blacklistRemoval)))
+ .updateRequests(new ArrayList<>(this.updates))
+ .build();
+ }
+
+ private void clearAllocateRequestLists() {
+ this.asks.clear();
+ this.releases.clear();
+ this.updates.clear();
+ this.blacklistAdditions.clear();
+ this.blacklistRemoval.clear();
+ }
+
+ private static UpdateContainerRequest createPromote(int id){
+ return UpdateContainerRequest.newInstance(0, createContainerId(id),
+ ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
+ ExecutionType.GUARANTEED);
+ }
+
+ private static UpdateContainerRequest createDemote(int id){
+ return UpdateContainerRequest.newInstance(0, createContainerId(id),
+ ContainerUpdateType.DEMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
+ ExecutionType.OPPORTUNISTIC);
+ }
+
+ private static ContainerId createContainerId(int id) {
+ return ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
+ id);
+ }
+
+ public ResourceRequest createResourceRequest(long id, String resource,
+ int memory, int vCores, int priority, ExecutionType execType,
+ int containers) {
+ ResourceRequest req = Records.newRecord(ResourceRequest.class);
+ req.setAllocationRequestId(id);
+ req.setResourceName(resource);
+ req.setCapability(Resource.newInstance(memory, vCores));
+ req.setPriority(Priority.newInstance(priority));
+ req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType));
+ req.setNumContainers(containers);
+ return req;
+ }
+
+ @Test
+ public void testGPending() throws YarnException, IOException {
+ // Ask for two containers, one with location preference
+ this.asks.add(
+ createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
+ 1));
+ this.asks.add(
+ createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
+ 1));
+ this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+ ExecutionType.GUARANTEED, 2));
+ this.homeRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+ // Ask from the uam
+ this.uamRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+ // Update the any to ask for an extra container
+ this.asks.get(2).setNumContainers(3);
+ this.homeRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+ // Update the any to ask to pretend a container was allocated
+ this.asks.get(2).setNumContainers(2);
+ this.homeRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(uamID, RequestType.Guaranteed).value());
+ }
+
+ @Test
+ public void testPromotePending() throws YarnException, IOException {
+ // Ask to promote 3 containers
+ this.updates.add(createPromote(1));
+ this.updates.add(createPromote(2));
+ this.updates.add(createPromote(3));
+
+ this.homeRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Promote).value());
+
+ // Demote 2 containers, one of which is pending promote
+ this.updates.remove(createPromote(3));
+ this.updates.add(createDemote(3));
+ this.updates.add(createDemote(4));
+
+ this.homeRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Promote).value());
+
+ // Let the RM respond with two successful promotions, one of which
+ // was pending promote
+ List<UpdatedContainer> updated = new ArrayList<>();
+ updated.add(UpdatedContainer
+ .newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
+ .newInstance(createContainerId(2), null, null, null, null, null)));
+ updated.add(UpdatedContainer
+ .newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
+ .newInstance(createContainerId(5), null, null, null, null, null)));
+ this.mockAMS.response.setUpdatedContainers(updated);
+
+ this.homeRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(1, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Promote).value());
+
+ // Remove the promoted container and clean up response
+ this.mockAMS.response.getUpdatedContainers().clear();
+ this.updates.remove(createPromote(2));
+
+ // Let the RM respond with two completed containers, one of which was
+ // pending promote
+ List<ContainerStatus> completed = new ArrayList<>();
+ completed
+ .add(ContainerStatus.newInstance(createContainerId(1), null, "", 0));
+ completed
+ .add(ContainerStatus.newInstance(createContainerId(6), null, "", 0));
+ this.mockAMS.response.setCompletedContainersStatuses(completed);
+
+ this.homeRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Promote).value());
+ }
+
+ @Test
+ public void testCleanUpOnFinish() throws YarnException, IOException {
+ // Ask for two containers, one with location preference
+ this.asks.add(
+ createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
+ 1));
+ this.asks.add(
+ createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
+ 1));
+ this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+ ExecutionType.GUARANTEED, 2));
+
+ // Ask to promote 3 containers
+ this.updates.add(createPromote(1));
+ this.updates.add(createPromote(2));
+ this.updates.add(createPromote(3));
+
+ // Run the allocate call to start tracking pending
+ this.homeRelayer.allocate(getAllocateRequest());
+
+ // After finish, the metrics should reset to zero
+ this.homeRelayer.shutdown();
+
+ Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Promote).value());
+ }
+
+ @Test
+ public void testFailover() throws YarnException, IOException {
+ // Ask for two containers, one with location preference
+ this.asks.add(
+ createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
+ 1));
+ this.asks.add(
+ createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
+ 1));
+ this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
+ ExecutionType.GUARANTEED, 2));
+
+ long previousSuccess = AMRMClientRelayerMetrics.getInstance()
+ .getHeartbeatSuccessMetric(homeID).value();
+ long previousFailover = AMRMClientRelayerMetrics.getInstance()
+ .getRMMasterSlaveSwitchMetric(homeID).value();
+ // Set failover to trigger
+ mockAMS.failover = true;
+ this.homeRelayer.allocate(getAllocateRequest());
+ // The failover metric should be incremented
+ Assert.assertEquals(++previousFailover,
+ AMRMClientRelayerMetrics.getInstance()
+ .getRMMasterSlaveSwitchMetric(homeID).value());
+
+ // The success metric should be incremented once
+ Assert.assertEquals(++previousSuccess,
+ AMRMClientRelayerMetrics.getInstance()
+ .getHeartbeatSuccessMetric(homeID).value());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+ // Ask from the uam
+ this.uamRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+ // Update the any to ask for an extra container
+ this.asks.get(2).setNumContainers(3);
+ mockAMS.failover = true;
+ this.homeRelayer.allocate(getAllocateRequest());
+ // The failover metric should be incremented
+ Assert.assertEquals(++previousFailover,
+ AMRMClientRelayerMetrics.getInstance()
+ .getRMMasterSlaveSwitchMetric(homeID).value());
+
+ // The success metric should be incremented once
+ Assert.assertEquals(++previousSuccess,
+ AMRMClientRelayerMetrics.getInstance()
+ .getHeartbeatSuccessMetric(homeID).value());
+
+ Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+ // Update the any to ask to pretend a container was allocated
+ this.asks.get(2).setNumContainers(2);
+ mockAMS.failover = true;
+ this.homeRelayer.allocate(getAllocateRequest());
+ // The failover metric should be incremented
+ Assert.assertEquals(++previousFailover,
+ AMRMClientRelayerMetrics.getInstance()
+ .getRMMasterSlaveSwitchMetric(homeID).value());
+
+ // The success metric should be incremented once
+ Assert.assertEquals(++previousSuccess,
+ AMRMClientRelayerMetrics.getInstance()
+ .getHeartbeatSuccessMetric(homeID).value());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(uamID, RequestType.Guaranteed).value());
+
+ long previousFailure = AMRMClientRelayerMetrics.getInstance()
+ .getHeartbeatFailureMetric(homeID).value();
+
+ mockAMS.exception = true;
+ try{
+ this.homeRelayer.allocate(getAllocateRequest());
+ Assert.fail();
+ } catch (YarnException e){
+ }
+ // The failover metric should not be incremented
+ Assert.assertEquals(previousFailover,
+ AMRMClientRelayerMetrics.getInstance()
+ .getRMMasterSlaveSwitchMetric(homeID).value());
+
+ // The success metric should not be incremented
+ Assert.assertEquals(previousSuccess,
+ AMRMClientRelayerMetrics.getInstance()
+ .getHeartbeatSuccessMetric(homeID).value());
+
+ // The failure metric should be incremented
+ Assert.assertEquals(++previousFailure,
+ AMRMClientRelayerMetrics.getInstance()
+ .getHeartbeatFailureMetric(homeID).value());
+
+ mockAMS.failover = true;
+ mockAMS.exception = true;
+ try{
+ this.homeRelayer.allocate(getAllocateRequest());
+ Assert.fail();
+ } catch (YarnException e){
+ }
+ // The failover metric should be incremented
+ Assert.assertEquals(++previousFailover,
+ AMRMClientRelayerMetrics.getInstance()
+ .getRMMasterSlaveSwitchMetric(homeID).value());
+
+ // The success metric should not be incremented
+ Assert.assertEquals(previousSuccess,
+ AMRMClientRelayerMetrics.getInstance()
+ .getHeartbeatSuccessMetric(homeID).value());
+
+ // The failure metric should be incremented
+ Assert.assertEquals(++previousFailure,
+ AMRMClientRelayerMetrics.getInstance()
+ .getHeartbeatFailureMetric(homeID).value());
+ }
+
+ @Test
+ public void testNewEmptyRequest()
+ throws YarnException, IOException {
+ // Ask for zero containers
+ this.asks.add(createResourceRequest(1, ResourceRequest.ANY, 2048, 1, 1,
+ ExecutionType.GUARANTEED, 0));
+ this.homeRelayer.allocate(getAllocateRequest());
+
+ Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(homeID, RequestType.Guaranteed).value());
+
+ Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
+ .getPendingMetric(uamID, RequestType.Guaranteed).value());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
index 5848d3f..b6bb0da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -374,7 +374,7 @@ public class TestUnmanagedApplicationManager {
ApplicationId appId, String queueName, String submitter,
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
super(conf, appId, queueName, submitter, appNameSuffix,
- keepContainersAcrossApplicationAttempts);
+ keepContainersAcrossApplicationAttempts, "TEST");
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index eb818f1..1bf882f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -254,7 +254,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
- ApplicationMasterProtocol.class, this.appOwner), appId);
+ ApplicationMasterProtocol.class, this.appOwner), appId,
+ this.homeSubClusterId.toString());
this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -340,7 +341,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.attemptId.getApplicationId(),
this.amRegistrationResponse.getQueue(),
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
- entry.getValue());
+ entry.getValue(), subClusterId.toString());
this.secondaryRelayers.put(subClusterId.getId(),
this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
@@ -666,7 +667,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
uamPool.finishApplicationMaster(subClusterId, finishRequest);
if (uamResponse.getIsUnregistered()) {
- secondaryRelayers.remove(subClusterId);
+ AMRMClientRelayer relayer =
+ secondaryRelayers.remove(subClusterId);
+ if(relayer != null) {
+ relayer.shutdown();
+ }
if (getNMStateStore() != null) {
getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
@@ -753,6 +758,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
this.threadpool = null;
}
+ homeRMRelayer.shutdown();
+ for(AMRMClientRelayer relayer : secondaryRelayers.values()){
+ relayer.shutdown();
+ }
super.shutdown();
}
@@ -885,7 +894,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
uamPool.reAttachUAM(subClusterId.getId(), config, appId,
amRegistrationResponse.getQueue(),
getApplicationContext().getUser(), homeSubClusterId.getId(),
- amrmToken);
+ amrmToken, subClusterId.toString());
secondaryRelayers.put(subClusterId.getId(),
uamPool.getAMRMClientRelayer(subClusterId.getId()));
@@ -1136,7 +1145,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
token = uamPool.launchUAM(subClusterId, config,
attemptId.getApplicationId(),
amRegistrationResponse.getQueue(), appContext.getUser(),
- homeSubClusterId.toString(), true);
+ homeSubClusterId.toString(), true, subClusterId);
secondaryRelayers.put(subClusterId,
uamPool.getAMRMClientRelayer(subClusterId));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b9bfdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index 1088c69..33617d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -122,7 +122,8 @@ public class TestableFederationInterceptor extends FederationInterceptor {
@Override
public UnmanagedApplicationManager createUAM(Configuration conf,
ApplicationId appId, String queueName, String submitter,
- String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
+ String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
+ String rmId) {
return new TestableUnmanagedApplicationManager(conf, appId, queueName,
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
}
@@ -139,7 +140,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
ApplicationId appId, String queueName, String submitter,
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
super(conf, appId, queueName, submitter, appNameSuffix,
- keepContainersAcrossApplicationAttempts);
+ keepContainersAcrossApplicationAttempts, "TEST");
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org