You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/02/14 14:10:27 UTC
hadoop git commit: YARN-5966. AMRMClient changes to support
ExecutionType update. (asuresh)
Repository: hadoop
Updated Branches:
refs/heads/trunk 4164a2032 -> aaf106fde
YARN-5966. AMRMClient changes to support ExecutionType update. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aaf106fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aaf106fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aaf106fd
Branch: refs/heads/trunk
Commit: aaf106fde35ec97e2e2ea4d7a67434038c4273ac
Parents: 4164a20
Author: Arun Suresh <as...@apache.org>
Authored: Tue Feb 14 06:08:27 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Tue Feb 14 06:09:10 2017 -0800
----------------------------------------------------------------------
.../yarn/api/records/UpdateContainerError.java | 19 +-
.../src/main/proto/yarn_service_protos.proto | 1 +
.../hadoop/yarn/client/api/AMRMClient.java | 33 +-
.../yarn/client/api/async/AMRMClientAsync.java | 33 +-
.../api/async/impl/AMRMClientAsyncImpl.java | 7 +-
.../yarn/client/api/impl/AMRMClientImpl.java | 111 +++--
.../yarn/client/api/impl/TestAMRMClient.java | 60 ++-
.../api/impl/TestAMRMClientOnRMRestart.java | 8 +-
.../TestOpportunisticContainerAllocation.java | 400 +++++++++++++++++--
.../impl/pb/UpdateContainerErrorPBImpl.java | 16 +
.../server/resourcemanager/RMServerUtils.java | 14 +-
...pportunisticContainerAllocatorAMService.java | 5 +-
.../capacity/TestIncreaseAllocationExpirer.java | 4 +-
13 files changed, 587 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
index e7458cf..4d184cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
@@ -59,6 +59,22 @@ public abstract class UpdateContainerError {
public abstract void setReason(String reason);
/**
+ * Get current container version.
+ * @return Current container Version.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract int getCurrentContainerVersion();
+
+ /**
+ * Set current container version.
+ * @param currentVersion Current container version.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract void setCurrentContainerVersion(int currentVersion);
+
+ /**
* Get the {@code UpdateContainerRequest} that was not satisfiable.
* @return UpdateContainerRequest
*/
@@ -89,6 +105,7 @@ public abstract class UpdateContainerError {
@Override
public String toString() {
return "UpdateContainerError{reason=" + getReason() + ", "
+ + "currentVersion=" + getCurrentContainerVersion() + ", "
+ "req=" + getUpdateContainerRequest() + "}";
}
@@ -120,6 +137,6 @@ public abstract class UpdateContainerError {
} else if (!req.equals(other.getUpdateContainerRequest())) {
return false;
}
- return true;
+ return getCurrentContainerVersion() == other.getCurrentContainerVersion();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index df3c852..c6647c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -78,6 +78,7 @@ message UpdateContainerRequestProto {
message UpdateContainerErrorProto {
optional string reason = 1;
optional UpdateContainerRequestProto update_request = 2;
+ optional int32 current_container_version = 3;
}
message AllocateRequestProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 52155f5..15d0065 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -33,17 +33,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.yarn.util.resource.Resources;
@InterfaceAudience.Public
@InterfaceStability.Stable
@@ -518,12 +521,38 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* ResourceManager to change the existing resource allocation to the target
* resource allocation.
*
+ * @deprecated use
+ * {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
+ *
* @param container The container returned from the last successful resource
* allocation or resource change
* @param capability The target resource capability of the container
*/
- public abstract void requestContainerResourceChange(
- Container container, Resource capability);
+ @Deprecated
+ public void requestContainerResourceChange(
+ Container container, Resource capability) {
+ Preconditions.checkNotNull(container, "Container cannot be null!!");
+ Preconditions.checkNotNull(capability,
+ "UpdateContainerRequest cannot be null!!");
+ requestContainerUpdate(container, UpdateContainerRequest.newInstance(
+ container.getVersion(), container.getId(),
+ Resources.fitsIn(capability, container.getResource()) ?
+ ContainerUpdateType.DECREASE_RESOURCE :
+ ContainerUpdateType.INCREASE_RESOURCE,
+ capability, null));
+ }
+
+ /**
+ * Request a container update before calling <code>allocate</code>.
+ * Any previous pending update request of the same container will be
+ * removed.
+ *
+ * @param container The container returned from the last successful resource
+ * allocation or update
+ * @param updateContainerRequest The <code>UpdateContainerRequest</code>.
+ */
+ public abstract void requestContainerUpdate(
+ Container container, UpdateContainerRequest updateContainerRequest);
/**
* Release containers assigned by the Resource Manager. If the app cannot use
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index d2195a6..4cb27cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.resource.Resources;
/**
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
@@ -284,12 +287,38 @@ extends AbstractService {
* ResourceManager to change the existing resource allocation to the target
* resource allocation.
*
+ * @deprecated use
+ * {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
+ *
* @param container The container returned from the last successful resource
* allocation or resource change
* @param capability The target resource capability of the container
*/
- public abstract void requestContainerResourceChange(
- Container container, Resource capability);
+ @Deprecated
+ public void requestContainerResourceChange(
+ Container container, Resource capability) {
+ Preconditions.checkNotNull(container, "Container cannot be null!!");
+ Preconditions.checkNotNull(capability,
+ "UpdateContainerRequest cannot be null!!");
+ requestContainerUpdate(container, UpdateContainerRequest.newInstance(
+ container.getVersion(), container.getId(),
+ Resources.fitsIn(capability, container.getResource()) ?
+ ContainerUpdateType.DECREASE_RESOURCE :
+ ContainerUpdateType.INCREASE_RESOURCE,
+ capability, null));
+ }
+
+ /**
+ * Request a container update before calling <code>allocate</code>.
+ * Any previous pending update request of the same container will be
+ * removed.
+ *
+ * @param container The container returned from the last successful resource
+ * allocation or update
+ * @param updateContainerRequest The <code>UpdateContainerRequest</code>.
+ */
+ public abstract void requestContainerUpdate(
+ Container container, UpdateContainerRequest updateContainerRequest);
/**
* Release containers assigned by the Resource Manager. If the app cannot use
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 3e72d3f..9e2c0e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -207,9 +208,9 @@ extends AMRMClientAsync<T> {
}
@Override
- public void requestContainerResourceChange(
- Container container, Resource capability) {
- client.requestContainerResourceChange(container, capability);
+ public void requestContainerUpdate(Container container,
+ UpdateContainerRequest updateContainerRequest) {
+ client.requestContainerUpdate(container, updateContainerRequest);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 44fc1e0..7da91de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -169,15 +169,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
// change map holds container resource change requests between two allocate()
// calls, and are cleared after each successful allocate() call.
- protected final Map<ContainerId, SimpleEntry<Container, Resource>> change =
- new HashMap<>();
+ protected final Map<ContainerId,
+ SimpleEntry<Container, UpdateContainerRequest>> change = new HashMap<>();
// pendingChange map holds history of container resource change requests in
// case AM needs to reregister with the ResourceManager.
// Change requests are removed from this map if RM confirms the change
// through allocate response, or if RM confirms that the container has been
// completed.
- protected final Map<ContainerId, SimpleEntry<Container, Resource>>
- pendingChange = new HashMap<>();
+ protected final Map<ContainerId,
+ SimpleEntry<Container, UpdateContainerRequest>> pendingChange =
+ new HashMap<>();
public AMRMClientImpl() {
super(AMRMClientImpl.class.getName());
@@ -259,7 +260,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
AllocateRequest allocateRequest = null;
List<String> blacklistToAdd = new ArrayList<String>();
List<String> blacklistToRemove = new ArrayList<String>();
- Map<ContainerId, SimpleEntry<Container, Resource>> oldChange =
+ Map<ContainerId, SimpleEntry<Container, UpdateContainerRequest>> oldChange =
new HashMap<>();
try {
synchronized (this) {
@@ -374,14 +375,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
//
// Only insert entries from the cached oldChange map
// that do not exist in the current change map:
- for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
+ for (Map.Entry<ContainerId,
+ SimpleEntry<Container, UpdateContainerRequest>> entry :
oldChange.entrySet()) {
ContainerId oldContainerId = entry.getKey();
Container oldContainer = entry.getValue().getKey();
- Resource oldResource = entry.getValue().getValue();
+ UpdateContainerRequest oldupdate = entry.getValue().getValue();
if (change.get(oldContainerId) == null) {
change.put(
- oldContainerId, new SimpleEntry<>(oldContainer, oldResource));
+ oldContainerId, new SimpleEntry<>(oldContainer, oldupdate));
}
}
blacklistAdditions.addAll(blacklistToAdd);
@@ -394,19 +396,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
private List<UpdateContainerRequest> createUpdateList() {
List<UpdateContainerRequest> updateList = new ArrayList<>();
- for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
- change.entrySet()) {
- Resource targetCapability = entry.getValue().getValue();
- Resource currCapability = entry.getValue().getKey().getResource();
- int version = entry.getValue().getKey().getVersion();
+ for (Map.Entry<ContainerId, SimpleEntry<Container,
+ UpdateContainerRequest>> entry : change.entrySet()) {
+ Resource targetCapability = entry.getValue().getValue().getCapability();
+ ExecutionType targetExecType =
+ entry.getValue().getValue().getExecutionType();
ContainerUpdateType updateType =
- ContainerUpdateType.INCREASE_RESOURCE;
- if (Resources.fitsIn(targetCapability, currCapability)) {
- updateType = ContainerUpdateType.DECREASE_RESOURCE;
- }
+ entry.getValue().getValue().getContainerUpdateType();
+ int version = entry.getValue().getKey().getVersion();
updateList.add(
UpdateContainerRequest.newInstance(version, entry.getKey(),
- updateType, targetCapability, null));
+ updateType, targetCapability, targetExecType));
}
return updateList;
}
@@ -591,21 +591,47 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
@Override
- public synchronized void requestContainerResourceChange(
- Container container, Resource capability) {
- validateContainerResourceChangeRequest(
- container.getId(), container.getResource(), capability);
+ public synchronized void requestContainerUpdate(
+ Container container, UpdateContainerRequest updateContainerRequest) {
+ Preconditions.checkNotNull(container, "Container cannot be null!!");
+ Preconditions.checkNotNull(updateContainerRequest,
+ "UpdateContainerRequest cannot be null!!");
+ LOG.info("Requesting Container update : " +
+ "container=" + container + ", " +
+ "updateType=" + updateContainerRequest.getContainerUpdateType() + ", " +
+ "targetCapability=" + updateContainerRequest.getCapability() + ", " +
+ "targetExecType=" + updateContainerRequest.getExecutionType());
+ if (updateContainerRequest.getCapability() != null &&
+ updateContainerRequest.getExecutionType() == null) {
+ validateContainerResourceChangeRequest(
+ updateContainerRequest.getContainerUpdateType(),
+ container.getId(), container.getResource(),
+ updateContainerRequest.getCapability());
+ } else if (updateContainerRequest.getExecutionType() != null &&
+ updateContainerRequest.getCapability() == null) {
+ validateContainerExecTypeChangeRequest(
+ updateContainerRequest.getContainerUpdateType(),
+ container.getId(), container.getExecutionType(),
+ updateContainerRequest.getExecutionType());
+ } else if (updateContainerRequest.getExecutionType() == null &&
+ updateContainerRequest.getCapability() == null) {
+ throw new IllegalArgumentException("Both target Capability and" +
+ "target Execution Type are null");
+ } else {
+ throw new IllegalArgumentException("Support currently exists only for" +
+ " EITHER update of Capability OR update of Execution Type NOT both");
+ }
if (change.get(container.getId()) == null) {
change.put(container.getId(),
- new SimpleEntry<>(container, capability));
+ new SimpleEntry<>(container, updateContainerRequest));
} else {
- change.get(container.getId()).setValue(capability);
+ change.get(container.getId()).setValue(updateContainerRequest);
}
if (pendingChange.get(container.getId()) == null) {
pendingChange.put(container.getId(),
- new SimpleEntry<>(container, capability));
+ new SimpleEntry<>(container, updateContainerRequest));
} else {
- pendingChange.get(container.getId()).setValue(capability);
+ pendingChange.get(container.getId()).setValue(updateContainerRequest);
}
}
@@ -755,7 +781,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
private void validateContainerResourceChangeRequest(
- ContainerId containerId, Resource original, Resource target) {
+ ContainerUpdateType updateType, ContainerId containerId,
+ Resource original, Resource target) {
Preconditions.checkArgument(containerId != null,
"ContainerId cannot be null");
Preconditions.checkArgument(original != null,
@@ -768,6 +795,36 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
Preconditions.checkArgument(!Resources.equals(Resources.none(), target)
&& Resources.fitsIn(Resources.none(), target),
"Target resource capability must be greater than 0");
+ if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+ Preconditions.checkArgument(Resources.fitsIn(target, original),
+ "Target resource capability must fit in Original capability");
+ } else {
+ Preconditions.checkArgument(Resources.fitsIn(original, target),
+ "Target resource capability must be more than Original capability");
+
+ }
+ }
+
+ private void validateContainerExecTypeChangeRequest(
+ ContainerUpdateType updateType, ContainerId containerId,
+ ExecutionType original, ExecutionType target) {
+ Preconditions.checkArgument(containerId != null,
+ "ContainerId cannot be null");
+ Preconditions.checkArgument(original != null,
+ "Original Execution Type cannot be null");
+ Preconditions.checkArgument(target != null,
+ "Target Execution Type cannot be null");
+ if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
+ Preconditions.checkArgument(target == ExecutionType.OPPORTUNISTIC
+ && original == ExecutionType.GUARANTEED,
+ "Incorrect Container update request, target should be" +
+ " OPPORTUNISTIC and original should be GUARANTEED");
+ } else {
+ Preconditions.checkArgument(target == ExecutionType.GUARANTEED
+ && original == ExecutionType.OPPORTUNISTIC,
+ "Incorrect Container update request, target should be" +
+ " GUARANTEED and original should be OPPORTUNISTIC");
+ }
}
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 8b1bbc7..4f73bac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -51,29 +51,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.UpdatedContainer;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -1058,26 +1036,36 @@ public class TestAMRMClient {
(AMRMClientImpl<ContainerRequest>) amClient;
Assert.assertEquals(0, amClientImpl.change.size());
// verify newer request overwrites older request for the container1
- amClientImpl.requestContainerResourceChange(
- container1, Resource.newInstance(2048, 1));
- amClientImpl.requestContainerResourceChange(
- container1, Resource.newInstance(4096, 1));
+ amClientImpl.requestContainerUpdate(container1,
+ UpdateContainerRequest.newInstance(container1.getVersion(),
+ container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
+ Resource.newInstance(2048, 1), null));
+ amClientImpl.requestContainerUpdate(container1,
+ UpdateContainerRequest.newInstance(container1.getVersion(),
+ container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
+ Resource.newInstance(4096, 1), null));
Assert.assertEquals(Resource.newInstance(4096, 1),
- amClientImpl.change.get(container1.getId()).getValue());
+ amClientImpl.change.get(container1.getId()).getValue().getCapability());
// verify new decrease request cancels old increase request for container1
- amClientImpl.requestContainerResourceChange(
- container1, Resource.newInstance(512, 1));
+ amClientImpl.requestContainerUpdate(container1,
+ UpdateContainerRequest.newInstance(container1.getVersion(),
+ container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
+ Resource.newInstance(512, 1), null));
Assert.assertEquals(Resource.newInstance(512, 1),
- amClientImpl.change.get(container1.getId()).getValue());
+ amClientImpl.change.get(container1.getId()).getValue().getCapability());
// request resource increase for container2
- amClientImpl.requestContainerResourceChange(
- container2, Resource.newInstance(2048, 1));
+ amClientImpl.requestContainerUpdate(container2,
+ UpdateContainerRequest.newInstance(container2.getVersion(),
+ container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
+ Resource.newInstance(2048, 1), null));
Assert.assertEquals(Resource.newInstance(2048, 1),
- amClientImpl.change.get(container2.getId()).getValue());
+ amClientImpl.change.get(container2.getId()).getValue().getCapability());
// verify release request will cancel pending change requests for the same
// container
- amClientImpl.requestContainerResourceChange(
- container3, Resource.newInstance(2048, 1));
+ amClientImpl.requestContainerUpdate(container3,
+ UpdateContainerRequest.newInstance(container3.getVersion(),
+ container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
+ Resource.newInstance(2048, 1), null));
Assert.assertEquals(3, amClientImpl.pendingChange.size());
amClientImpl.releaseAssignedContainer(container3.getId());
Assert.assertEquals(2, amClientImpl.pendingChange.size());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index ac77446..39a7633 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -233,8 +234,11 @@ public class TestAMRMClientOnRMRestart {
nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
containerId.getContainerId(), ContainerState.RUNNING);
dispatcher.await();
- amClient.requestContainerResourceChange(
- container, Resource.newInstance(2048, 1));
+ amClient.requestContainerUpdate(
+ container, UpdateContainerRequest.newInstance(
+ container.getVersion(), container.getId(),
+ ContainerUpdateType.INCREASE_RESOURCE,
+ Resource.newInstance(2048, 1), null));
it.remove();
allocateResponse = amClient.allocate(0.3f);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
index 802c207..305d18b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -44,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -54,6 +57,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
@@ -66,13 +72,17 @@ import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
/**
* Class that tests the allocation of OPPORTUNISTIC containers through the
@@ -83,7 +93,6 @@ public class TestOpportunisticContainerAllocation {
private static MiniYARNCluster yarnCluster = null;
private static YarnClient yarnClient = null;
private static List<NodeReport> nodeReports = null;
- private static ApplicationAttemptId attemptId = null;
private static int nodeCount = 3;
private static final int ROLLING_INTERVAL_SEC = 13;
@@ -92,12 +101,22 @@ public class TestOpportunisticContainerAllocation {
private static Resource capability;
private static Priority priority;
private static Priority priority2;
+ private static Priority priority3;
+ private static Priority priority4;
private static String node;
private static String rack;
private static String[] nodes;
private static String[] racks;
private final static int DEFAULT_ITERATION = 3;
+ // Per test..
+ private ApplicationAttemptId attemptId = null;
+ private AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
+ private long availMB;
+ private int availVCores;
+ private long allocMB;
+ private int allocVCores;
+
@BeforeClass
public static void setup() throws Exception {
// start minicluster
@@ -106,7 +125,7 @@ public class TestOpportunisticContainerAllocation {
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
ROLLING_INTERVAL_SEC);
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
- conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+ conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
// set the minimum allocation so that resource decrease can go under 1024
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setBoolean(
@@ -129,7 +148,9 @@ public class TestOpportunisticContainerAllocation {
priority = Priority.newInstance(1);
priority2 = Priority.newInstance(2);
- capability = Resource.newInstance(1024, 1);
+ priority3 = Priority.newInstance(3);
+ priority4 = Priority.newInstance(4);
+ capability = Resource.newInstance(512, 1);
node = nodeReports.get(0).getNodeId().getHost();
rack = nodeReports.get(0).getRackName();
@@ -193,10 +214,35 @@ public class TestOpportunisticContainerAllocation {
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
appAttempt.getAMRMToken()
.setService(ClientRMProxy.getAMRMTokenService(conf));
+
+ // start am rm client
+ amClient = (AMRMClientImpl<AMRMClient.ContainerRequest>)AMRMClient
+ .createAMRMClient();
+
+ //setting an instance NMTokenCache
+ amClient.setNMTokenCache(new NMTokenCache());
+ //asserting we are not using the singleton instance cache
+ Assert.assertNotSame(NMTokenCache.getSingleton(),
+ amClient.getNMTokenCache());
+
+ amClient.init(conf);
+ amClient.start();
+
+ amClient.registerApplicationMaster("Host", 10000, "");
}
@After
public void cancelApp() throws YarnException, IOException {
+ try {
+ amClient
+ .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+ null);
+ } finally {
+ if (amClient != null &&
+ amClient.getServiceState() == Service.STATE.STARTED) {
+ amClient.stop();
+ }
+ }
yarnClient.killApplication(attemptId.getApplicationId());
attemptId = null;
}
@@ -214,43 +260,254 @@ public class TestOpportunisticContainerAllocation {
}
@Test(timeout = 60000)
- public void testAMRMClient() throws YarnException, IOException {
- AMRMClient<AMRMClient.ContainerRequest> amClient = null;
+ public void testPromotionFromAcquired() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(1, oppContainersRequestedAny);
+
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+ updateMetrics("Before Opp Allocation");
+
+ while (allocatedContainerCount < oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ allocatedOpportContainers.put(container.getId(), container);
+ removeCR(container);
+ }
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < oppContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
+
+ updateMetrics("After Opp Allocation / Before Promotion");
+
try {
- // start am rm client
- amClient = AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ Assert.fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ Assert.assertTrue(e.getMessage().contains(
+ "target should be GUARANTEED and original should be OPPORTUNISTIC"));
+ }
- //setting an instance NMTokenCache
- amClient.setNMTokenCache(new NMTokenCache());
- //asserting we are not using the singleton instance cache
- Assert.assertNotSame(NMTokenCache.getSingleton(),
- amClient.getNMTokenCache());
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ iterationsLeft = 120;
+ Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
- amClient.init(conf);
- amClient.start();
+ updateMetrics("After Promotion");
+
+ assertEquals(1, updatedContainers.size());
+ for (ContainerId cId : allocatedOpportContainers.keySet()) {
+ Container orig = allocatedOpportContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.GUARANTEED,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ amClient.ask.clear();
+ }
- amClient.registerApplicationMaster("Host", 10000, "");
+ @Test(timeout = 60000)
+ public void testDemotionFromAcquired() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
- testOpportunisticAllocation(
- (AMRMClientImpl<AMRMClient.ContainerRequest>) amClient);
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority3));
- testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
+ int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
- amClient
- .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
- null);
+ assertEquals(1, guarContainersRequestedAny);
- } finally {
- if (amClient != null &&
- amClient.getServiceState() == Service.STATE.STARTED) {
- amClient.stop();
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map<ContainerId, Container> allocatedGuarContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+ updateMetrics("Before Guar Allocation");
+
+ while (allocatedContainerCount < guarContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+ allocatedGuarContainers.put(container.getId(), container);
+ removeCR(container);
+ }
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < guarContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(guarContainersRequestedAny, allocatedContainerCount);
+ assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
+
+ updateMetrics("After Guar Allocation / Before Demotion");
+
+ try {
+ Container c = allocatedGuarContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ Assert.fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ Assert.assertTrue(e.getMessage().contains(
+ "target should be OPPORTUNISTIC and original should be GUARANTEED"));
+ }
+
+ Container c = allocatedGuarContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ iterationsLeft = 120;
+ Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
}
}
+
+ updateMetrics("After Demotion");
+
+ assertEquals(1, updatedContainers.size());
+ for (ContainerId cId : allocatedGuarContainers.keySet()) {
+ Container orig = allocatedGuarContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.OPPORTUNISTIC,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ amClient.ask.clear();
}
- private void testAllocation(
- final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
- throws YarnException, IOException {
+ @Test(timeout = 60000)
+ public void testMixedAllocationAndRelease() throws YarnException,
+ IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
@@ -274,6 +531,28 @@ public class TestOpportunisticContainerAllocation {
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
+ int containersRequestedNode = amClient.getTable(0).get(priority,
+ node, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedRack = amClient.getTable(0).get(priority,
+ rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedAny = amClient.getTable(0).get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(4, containersRequestedNode);
+ assertEquals(4, containersRequestedRack);
+ assertEquals(4, containersRequestedAny);
+ assertEquals(2, oppContainersRequestedAny);
+
+ assertEquals(4, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
@@ -284,16 +563,16 @@ public class TestOpportunisticContainerAllocation {
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
- int containersRequestedNode = amClient.getTable(0).get(priority,
+ containersRequestedNode = amClient.getTable(0).get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
- int containersRequestedRack = amClient.getTable(0).get(priority,
+ containersRequestedRack = amClient.getTable(0).get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
- int containersRequestedAny = amClient.getTable(0).get(priority,
+ containersRequestedAny = amClient.getTable(0).get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
- int oppContainersRequestedAny =
+ oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
@@ -309,7 +588,7 @@ public class TestOpportunisticContainerAllocation {
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int allocatedOpportContainerCount = 0;
- int iterationsLeft = 10;
+ int iterationsLeft = 50;
Set<ContainerId> releases = new TreeSet<>();
amClient.getNMTokenCache().clearCache();
@@ -324,8 +603,8 @@ public class TestOpportunisticContainerAllocation {
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
- allocatedContainerCount += allocResponse.getAllocatedContainers()
- .size();
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
for (Container container : allocResponse.getAllocatedContainers()) {
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
allocatedOpportContainerCount++;
@@ -345,9 +624,9 @@ public class TestOpportunisticContainerAllocation {
}
}
- assertEquals(allocatedContainerCount,
- containersRequestedAny + oppContainersRequestedAny);
- assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny);
+ assertEquals(containersRequestedAny + oppContainersRequestedAny,
+ allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
for (ContainerId rejectContainerId : releases) {
amClient.releaseAssignedContainer(rejectContainerId);
}
@@ -395,26 +674,25 @@ public class TestOpportunisticContainerAllocation {
/**
* Tests allocation with requests comprising only opportunistic containers.
*/
- private void testOpportunisticAllocation(
- final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
- throws YarnException, IOException {
+ @Test(timeout = 60000)
+ public void testOpportunisticAllocation() throws YarnException, IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
+ new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
+ new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int oppContainersRequestedAny =
- amClient.getTable(0).get(priority, ResourceRequest.ANY,
+ amClient.getTable(0).get(priority3, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
@@ -456,9 +734,43 @@ public class TestOpportunisticContainerAllocation {
}
}
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
assertEquals(1, receivedNMTokens.values().size());
}
+ private void removeCR(Container container) {
+ List<? extends Collection<AMRMClient.ContainerRequest>>
+ matchingRequests = amClient.getMatchingRequests(container
+ .getPriority(),
+ ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+ container.getResource());
+ Set<AMRMClient.ContainerRequest> toRemove = new HashSet<>();
+ for (Collection<AMRMClient.ContainerRequest> rc : matchingRequests) {
+ for (AMRMClient.ContainerRequest cr : rc) {
+ toRemove.add(cr);
+ }
+ }
+ for (AMRMClient.ContainerRequest cr : toRemove) {
+ amClient.removeContainerRequest(cr);
+ }
+ }
+
+ private void updateMetrics(String msg) {
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler)yarnCluster.getResourceManager()
+ .getResourceScheduler();
+ availMB = scheduler.getRootQueueMetrics().getAvailableMB();
+ availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
+ allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
+ allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
+ System.out.println("## METRICS (" + msg + ")==>");
+ System.out.println(" : availMB=" + availMB + ", " +
+ "availVCores=" +availVCores + ", " +
+ "allocMB=" + allocMB + ", " +
+ "allocVCores=" + allocVCores + ", ");
+ System.out.println("<== ##");
+ }
+
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
index fb6c1a7..8ff9d9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
@@ -74,6 +74,22 @@ public class UpdateContainerErrorPBImpl extends UpdateContainerError {
}
@Override
+ public int getCurrentContainerVersion() {
+ YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasCurrentContainerVersion()) {
+ return 0;
+ }
+ return p.getCurrentContainerVersion();
+ }
+
+ @Override
+ public void setCurrentContainerVersion(int containerVersion) {
+ maybeInitBuilder();
+ builder.setCurrentContainerVersion(containerVersion);
+ }
+
+ @Override
public UpdateContainerRequest getUpdateContainerRequest() {
YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p = viaProto ? proto
: builder;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 94bfd58..224a1da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -188,19 +188,25 @@ public class RMServerUtils {
}
}
}
- checkAndcreateUpdateError(updateErrors, updateReq, msg);
+ checkAndcreateUpdateError(updateErrors, updateReq, rmContainer, msg);
}
return updateRequests;
}
private static void checkAndcreateUpdateError(
List<UpdateContainerError> errors, UpdateContainerRequest updateReq,
- String msg) {
+ RMContainer rmContainer, String msg) {
if (msg != null) {
UpdateContainerError updateError = RECORD_FACTORY
.newRecordInstance(UpdateContainerError.class);
updateError.setReason(msg);
updateError.setUpdateContainerRequest(updateReq);
+ if (rmContainer != null) {
+ updateError.setCurrentContainerVersion(
+ rmContainer.getContainer().getVersion());
+ } else {
+ updateError.setCurrentContainerVersion(-1);
+ }
errors.add(updateError);
}
}
@@ -216,9 +222,7 @@ public class RMServerUtils {
// version
if (msg == null && updateReq.getContainerVersion() !=
rmContainer.getContainer().getVersion()) {
- msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
- + updateReq.getContainerVersion() + "|"
- + rmContainer.getContainer().getVersion();
+ msg = INCORRECT_CONTAINER_VERSION_ERROR;
}
// No more than 1 container update per request.
if (msg == null &&
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 641ef64..b083642 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -251,8 +251,11 @@ public class TestOpportunisticContainerAllocatorAMService {
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
- Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0",
+ Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
allocateResponse.getUpdateErrors().get(0).getReason());
+ Assert.assertEquals(0,
+ allocateResponse.getUpdateErrors().get(0)
+ .getCurrentContainerVersion());
Assert.assertEquals(container.getId(),
allocateResponse.getUpdateErrors().get(0)
.getUpdateContainerRequest().getContainerId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
index c5829cf..74cecf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -275,8 +275,10 @@ public class TestIncreaseAllocationExpirer {
Resources.createResource(5 * GB), null)));
List<UpdateContainerError> updateErrors = response.getUpdateErrors();
Assert.assertEquals(1, updateErrors.size());
- Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|0|1",
+ Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
updateErrors.get(0).getReason());
+ Assert.assertEquals(1,
+ updateErrors.get(0).getCurrentContainerVersion());
// am1 asks to change containerId2 from 3GB to 5GB
am1.sendContainerResizingRequest(Collections.singletonList(
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org