You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/02/14 14:10:27 UTC

hadoop git commit: YARN-5966. AMRMClient changes to support ExecutionType update. (asuresh)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 4164a2032 -> aaf106fde


YARN-5966. AMRMClient changes to support ExecutionType update. (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aaf106fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aaf106fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aaf106fd

Branch: refs/heads/trunk
Commit: aaf106fde35ec97e2e2ea4d7a67434038c4273ac
Parents: 4164a20
Author: Arun Suresh <as...@apache.org>
Authored: Tue Feb 14 06:08:27 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Tue Feb 14 06:09:10 2017 -0800

----------------------------------------------------------------------
 .../yarn/api/records/UpdateContainerError.java  |  19 +-
 .../src/main/proto/yarn_service_protos.proto    |   1 +
 .../hadoop/yarn/client/api/AMRMClient.java      |  33 +-
 .../yarn/client/api/async/AMRMClientAsync.java  |  33 +-
 .../api/async/impl/AMRMClientAsyncImpl.java     |   7 +-
 .../yarn/client/api/impl/AMRMClientImpl.java    | 111 +++--
 .../yarn/client/api/impl/TestAMRMClient.java    |  60 ++-
 .../api/impl/TestAMRMClientOnRMRestart.java     |   8 +-
 .../TestOpportunisticContainerAllocation.java   | 400 +++++++++++++++++--
 .../impl/pb/UpdateContainerErrorPBImpl.java     |  16 +
 .../server/resourcemanager/RMServerUtils.java   |  14 +-
 ...pportunisticContainerAllocatorAMService.java |   5 +-
 .../capacity/TestIncreaseAllocationExpirer.java |   4 +-
 13 files changed, 587 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
index e7458cf..4d184cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
@@ -59,6 +59,22 @@ public abstract class UpdateContainerError {
   public abstract void setReason(String reason);
 
   /**
+   * Get current container version.
+   * @return Current container Version.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public abstract int getCurrentContainerVersion();
+
+  /**
+   * Set current container version.
+   * @param currentVersion Current container version.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public abstract void setCurrentContainerVersion(int currentVersion);
+
+  /**
    * Get the {@code UpdateContainerRequest} that was not satisfiable.
    * @return UpdateContainerRequest
    */
@@ -89,6 +105,7 @@ public abstract class UpdateContainerError {
   @Override
   public String toString() {
     return "UpdateContainerError{reason=" + getReason() + ", "
+        + "currentVersion=" + getCurrentContainerVersion() + ", "
         + "req=" + getUpdateContainerRequest() + "}";
   }
 
@@ -120,6 +137,6 @@ public abstract class UpdateContainerError {
     } else if (!req.equals(other.getUpdateContainerRequest())) {
       return false;
     }
-    return true;
+    return getCurrentContainerVersion() == other.getCurrentContainerVersion();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index df3c852..c6647c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -78,6 +78,7 @@ message UpdateContainerRequestProto {
 message UpdateContainerErrorProto {
   optional string reason = 1;
   optional UpdateContainerRequestProto update_request = 2;
+  optional int32 current_container_version = 3;
 }
 
 message AllocateRequestProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 52155f5..15d0065 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -33,17 +33,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -518,12 +521,38 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
    * ResourceManager to change the existing resource allocation to the target
    * resource allocation.
    *
+   * @deprecated use
+   * {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
+   *
    * @param container The container returned from the last successful resource
    *                  allocation or resource change
    * @param capability  The target resource capability of the container
    */
-  public abstract void requestContainerResourceChange(
-      Container container, Resource capability);
+  @Deprecated
+  public void requestContainerResourceChange(
+      Container container, Resource capability) {
+    Preconditions.checkNotNull(container, "Container cannot be null!!");
+    Preconditions.checkNotNull(capability,
+        "UpdateContainerRequest cannot be null!!");
+    requestContainerUpdate(container, UpdateContainerRequest.newInstance(
+        container.getVersion(), container.getId(),
+        Resources.fitsIn(capability, container.getResource()) ?
+            ContainerUpdateType.DECREASE_RESOURCE :
+            ContainerUpdateType.INCREASE_RESOURCE,
+        capability, null));
+  }
+
+  /**
+   * Request a container update before calling <code>allocate</code>.
+   * Any previous pending update request of the same container will be
+   * removed.
+   *
+   * @param container The container returned from the last successful resource
+   *                  allocation or update
+   * @param updateContainerRequest The <code>UpdateContainerRequest</code>.
+   */
+  public abstract void requestContainerUpdate(
+      Container container, UpdateContainerRequest updateContainerRequest);
 
   /**
    * Release containers assigned by the Resource Manager. If the app cannot use

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index d2195a6..4cb27cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * <code>AMRMClientAsync</code> handles communication with the ResourceManager
@@ -284,12 +287,38 @@ extends AbstractService {
    * ResourceManager to change the existing resource allocation to the target
    * resource allocation.
    *
+   * @deprecated use
+   * {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
+   *
    * @param container The container returned from the last successful resource
    *                  allocation or resource change
    * @param capability  The target resource capability of the container
    */
-  public abstract void requestContainerResourceChange(
-      Container container, Resource capability);
+  @Deprecated
+  public void requestContainerResourceChange(
+      Container container, Resource capability) {
+    Preconditions.checkNotNull(container, "Container cannot be null!!");
+    Preconditions.checkNotNull(capability,
+        "UpdateContainerRequest cannot be null!!");
+    requestContainerUpdate(container, UpdateContainerRequest.newInstance(
+        container.getVersion(), container.getId(),
+        Resources.fitsIn(capability, container.getResource()) ?
+            ContainerUpdateType.DECREASE_RESOURCE :
+            ContainerUpdateType.INCREASE_RESOURCE,
+        capability, null));
+  }
+
+  /**
+   * Request a container update before calling <code>allocate</code>.
+   * Any previous pending update request of the same container will be
+   * removed.
+   *
+   * @param container The container returned from the last successful resource
+   *                  allocation or update
+   * @param updateContainerRequest The <code>UpdateContainerRequest</code>.
+   */
+  public abstract void requestContainerUpdate(
+      Container container, UpdateContainerRequest updateContainerRequest);
 
   /**
    * Release containers assigned by the Resource Manager. If the app cannot use

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 3e72d3f..9e2c0e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -207,9 +208,9 @@ extends AMRMClientAsync<T> {
   }
 
   @Override
-  public void requestContainerResourceChange(
-      Container container, Resource capability) {
-    client.requestContainerResourceChange(container, capability);
+  public void requestContainerUpdate(Container container,
+      UpdateContainerRequest updateContainerRequest) {
+    client.requestContainerUpdate(container, updateContainerRequest);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 44fc1e0..7da91de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -169,15 +169,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
   // change map holds container resource change requests between two allocate()
   // calls, and are cleared after each successful allocate() call.
-  protected final Map<ContainerId, SimpleEntry<Container, Resource>> change =
-      new HashMap<>();
+  protected final Map<ContainerId,
+      SimpleEntry<Container, UpdateContainerRequest>> change = new HashMap<>();
   // pendingChange map holds history of container resource change requests in
   // case AM needs to reregister with the ResourceManager.
   // Change requests are removed from this map if RM confirms the change
   // through allocate response, or if RM confirms that the container has been
   // completed.
-  protected final Map<ContainerId, SimpleEntry<Container, Resource>>
-      pendingChange = new HashMap<>();
+  protected final Map<ContainerId,
+      SimpleEntry<Container, UpdateContainerRequest>> pendingChange =
+      new HashMap<>();
 
   public AMRMClientImpl() {
     super(AMRMClientImpl.class.getName());
@@ -259,7 +260,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     AllocateRequest allocateRequest = null;
     List<String> blacklistToAdd = new ArrayList<String>();
     List<String> blacklistToRemove = new ArrayList<String>();
-    Map<ContainerId, SimpleEntry<Container, Resource>> oldChange =
+    Map<ContainerId, SimpleEntry<Container, UpdateContainerRequest>> oldChange =
         new HashMap<>();
     try {
       synchronized (this) {
@@ -374,14 +375,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           //
           // Only insert entries from the cached oldChange map
           // that do not exist in the current change map:
-          for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
+          for (Map.Entry<ContainerId,
+              SimpleEntry<Container, UpdateContainerRequest>> entry :
               oldChange.entrySet()) {
             ContainerId oldContainerId = entry.getKey();
             Container oldContainer = entry.getValue().getKey();
-            Resource oldResource = entry.getValue().getValue();
+            UpdateContainerRequest oldupdate = entry.getValue().getValue();
             if (change.get(oldContainerId) == null) {
               change.put(
-                  oldContainerId, new SimpleEntry<>(oldContainer, oldResource));
+                  oldContainerId, new SimpleEntry<>(oldContainer, oldupdate));
             }
           }
           blacklistAdditions.addAll(blacklistToAdd);
@@ -394,19 +396,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
 
   private List<UpdateContainerRequest> createUpdateList() {
     List<UpdateContainerRequest> updateList = new ArrayList<>();
-    for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
-        change.entrySet()) {
-      Resource targetCapability = entry.getValue().getValue();
-      Resource currCapability = entry.getValue().getKey().getResource();
-      int version = entry.getValue().getKey().getVersion();
+    for (Map.Entry<ContainerId, SimpleEntry<Container,
+        UpdateContainerRequest>> entry : change.entrySet()) {
+      Resource targetCapability = entry.getValue().getValue().getCapability();
+      ExecutionType targetExecType =
+          entry.getValue().getValue().getExecutionType();
       ContainerUpdateType updateType =
-          ContainerUpdateType.INCREASE_RESOURCE;
-      if (Resources.fitsIn(targetCapability, currCapability)) {
-        updateType = ContainerUpdateType.DECREASE_RESOURCE;
-      }
+          entry.getValue().getValue().getContainerUpdateType();
+      int version = entry.getValue().getKey().getVersion();
       updateList.add(
           UpdateContainerRequest.newInstance(version, entry.getKey(),
-              updateType, targetCapability, null));
+              updateType, targetCapability, targetExecType));
     }
     return updateList;
   }
@@ -591,21 +591,47 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   }
 
   @Override
-  public synchronized void requestContainerResourceChange(
-      Container container, Resource capability) {
-    validateContainerResourceChangeRequest(
-        container.getId(), container.getResource(), capability);
+  public synchronized void requestContainerUpdate(
+      Container container, UpdateContainerRequest updateContainerRequest) {
+    Preconditions.checkNotNull(container, "Container cannot be null!!");
+    Preconditions.checkNotNull(updateContainerRequest,
+        "UpdateContainerRequest cannot be null!!");
+    LOG.info("Requesting Container update : " +
+        "container=" + container + ", " +
+        "updateType=" + updateContainerRequest.getContainerUpdateType() + ", " +
+        "targetCapability=" + updateContainerRequest.getCapability() + ", " +
+        "targetExecType=" + updateContainerRequest.getExecutionType());
+    if (updateContainerRequest.getCapability() != null &&
+        updateContainerRequest.getExecutionType() == null) {
+      validateContainerResourceChangeRequest(
+          updateContainerRequest.getContainerUpdateType(),
+          container.getId(), container.getResource(),
+          updateContainerRequest.getCapability());
+    } else if (updateContainerRequest.getExecutionType() != null &&
+        updateContainerRequest.getCapability() == null) {
+      validateContainerExecTypeChangeRequest(
+          updateContainerRequest.getContainerUpdateType(),
+          container.getId(), container.getExecutionType(),
+          updateContainerRequest.getExecutionType());
+    } else if (updateContainerRequest.getExecutionType() == null &&
+        updateContainerRequest.getCapability() == null) {
+      throw new IllegalArgumentException("Both target Capability and" +
+          "target Execution Type are null");
+    } else {
+      throw new IllegalArgumentException("Support currently exists only for" +
+          " EITHER update of Capability OR update of Execution Type NOT both");
+    }
     if (change.get(container.getId()) == null) {
       change.put(container.getId(),
-          new SimpleEntry<>(container, capability));
+          new SimpleEntry<>(container, updateContainerRequest));
     } else {
-      change.get(container.getId()).setValue(capability);
+      change.get(container.getId()).setValue(updateContainerRequest);
     }
     if (pendingChange.get(container.getId()) == null) {
       pendingChange.put(container.getId(),
-          new SimpleEntry<>(container, capability));
+          new SimpleEntry<>(container, updateContainerRequest));
     } else {
-      pendingChange.get(container.getId()).setValue(capability);
+      pendingChange.get(container.getId()).setValue(updateContainerRequest);
     }
   }
 
@@ -755,7 +781,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   }
 
   private void validateContainerResourceChangeRequest(
-      ContainerId containerId, Resource original, Resource target) {
+      ContainerUpdateType updateType, ContainerId containerId,
+      Resource original, Resource target) {
     Preconditions.checkArgument(containerId != null,
         "ContainerId cannot be null");
     Preconditions.checkArgument(original != null,
@@ -768,6 +795,36 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     Preconditions.checkArgument(!Resources.equals(Resources.none(), target)
             && Resources.fitsIn(Resources.none(), target),
         "Target resource capability must be greater than 0");
+    if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+      Preconditions.checkArgument(Resources.fitsIn(target, original),
+          "Target resource capability must fit in Original capability");
+    } else {
+      Preconditions.checkArgument(Resources.fitsIn(original, target),
+          "Target resource capability must be more than Original capability");
+
+    }
+  }
+
+  private void validateContainerExecTypeChangeRequest(
+      ContainerUpdateType updateType, ContainerId containerId,
+      ExecutionType original, ExecutionType target) {
+    Preconditions.checkArgument(containerId != null,
+        "ContainerId cannot be null");
+    Preconditions.checkArgument(original != null,
+        "Original Execution Type cannot be null");
+    Preconditions.checkArgument(target != null,
+        "Target Execution Type cannot be null");
+    if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
+      Preconditions.checkArgument(target == ExecutionType.OPPORTUNISTIC
+              && original == ExecutionType.GUARANTEED,
+          "Incorrect Container update request, target should be" +
+              " OPPORTUNISTIC and original should be GUARANTEED");
+    } else {
+      Preconditions.checkArgument(target == ExecutionType.GUARANTEED
+                  && original == ExecutionType.OPPORTUNISTIC,
+          "Incorrect Container update request, target should be" +
+              " GUARANTEED and original should be OPPORTUNISTIC");
+    }
   }
 
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 8b1bbc7..4f73bac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -51,29 +51,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.UpdatedContainer;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -1058,26 +1036,36 @@ public class TestAMRMClient {
         (AMRMClientImpl<ContainerRequest>) amClient;
     Assert.assertEquals(0, amClientImpl.change.size());
     // verify newer request overwrites older request for the container1
-    amClientImpl.requestContainerResourceChange(
-        container1, Resource.newInstance(2048, 1));
-    amClientImpl.requestContainerResourceChange(
-        container1, Resource.newInstance(4096, 1));
+    amClientImpl.requestContainerUpdate(container1,
+        UpdateContainerRequest.newInstance(container1.getVersion(),
+            container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
+            Resource.newInstance(2048, 1), null));
+    amClientImpl.requestContainerUpdate(container1,
+        UpdateContainerRequest.newInstance(container1.getVersion(),
+            container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
+            Resource.newInstance(4096, 1), null));
     Assert.assertEquals(Resource.newInstance(4096, 1),
-        amClientImpl.change.get(container1.getId()).getValue());
+        amClientImpl.change.get(container1.getId()).getValue().getCapability());
     // verify new decrease request cancels old increase request for container1
-    amClientImpl.requestContainerResourceChange(
-        container1, Resource.newInstance(512, 1));
+    amClientImpl.requestContainerUpdate(container1,
+        UpdateContainerRequest.newInstance(container1.getVersion(),
+            container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
+            Resource.newInstance(512, 1), null));
     Assert.assertEquals(Resource.newInstance(512, 1),
-        amClientImpl.change.get(container1.getId()).getValue());
+        amClientImpl.change.get(container1.getId()).getValue().getCapability());
     // request resource increase for container2
-    amClientImpl.requestContainerResourceChange(
-        container2, Resource.newInstance(2048, 1));
+    amClientImpl.requestContainerUpdate(container2,
+        UpdateContainerRequest.newInstance(container2.getVersion(),
+            container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
+            Resource.newInstance(2048, 1), null));
     Assert.assertEquals(Resource.newInstance(2048, 1),
-        amClientImpl.change.get(container2.getId()).getValue());
+        amClientImpl.change.get(container2.getId()).getValue().getCapability());
     // verify release request will cancel pending change requests for the same
     // container
-    amClientImpl.requestContainerResourceChange(
-        container3, Resource.newInstance(2048, 1));
+    amClientImpl.requestContainerUpdate(container3,
+        UpdateContainerRequest.newInstance(container3.getVersion(),
+            container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
+            Resource.newInstance(2048, 1), null));
     Assert.assertEquals(3, amClientImpl.pendingChange.size());
     amClientImpl.releaseAssignedContainer(container3.getId());
     Assert.assertEquals(2, amClientImpl.pendingChange.size());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index ac77446..39a7633 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -233,8 +234,11 @@ public class TestAMRMClientOnRMRestart {
     nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
         containerId.getContainerId(), ContainerState.RUNNING);
     dispatcher.await();
-    amClient.requestContainerResourceChange(
-        container, Resource.newInstance(2048, 1));
+    amClient.requestContainerUpdate(
+        container, UpdateContainerRequest.newInstance(
+            container.getVersion(), container.getId(),
+            ContainerUpdateType.INCREASE_RESOURCE,
+            Resource.newInstance(2048, 1), null));
     it.remove();
 
     allocateResponse = amClient.allocate(0.3f);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
index 802c207..305d18b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -44,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -54,6 +57,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
@@ -66,13 +72,17 @@ import org.junit.Test;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /**
  * Class that tests the allocation of OPPORTUNISTIC containers through the
@@ -83,7 +93,6 @@ public class TestOpportunisticContainerAllocation {
   private static MiniYARNCluster yarnCluster = null;
   private static YarnClient yarnClient = null;
   private static List<NodeReport> nodeReports = null;
-  private static ApplicationAttemptId attemptId = null;
   private static int nodeCount = 3;
 
   private static final int ROLLING_INTERVAL_SEC = 13;
@@ -92,12 +101,22 @@ public class TestOpportunisticContainerAllocation {
   private static Resource capability;
   private static Priority priority;
   private static Priority priority2;
+  private static Priority priority3;
+  private static Priority priority4;
   private static String node;
   private static String rack;
   private static String[] nodes;
   private static String[] racks;
   private final static int DEFAULT_ITERATION = 3;
 
+  // Per test..
+  private ApplicationAttemptId attemptId = null;
+  private AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
+  private long availMB;
+  private int availVCores;
+  private long allocMB;
+  private int allocVCores;
+
   @BeforeClass
   public static void setup() throws Exception {
     // start minicluster
@@ -106,7 +125,7 @@ public class TestOpportunisticContainerAllocation {
         YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
         ROLLING_INTERVAL_SEC);
     conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
-    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
     // set the minimum allocation so that resource decrease can go under 1024
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     conf.setBoolean(
@@ -129,7 +148,9 @@ public class TestOpportunisticContainerAllocation {
 
     priority = Priority.newInstance(1);
     priority2 = Priority.newInstance(2);
-    capability = Resource.newInstance(1024, 1);
+    priority3 = Priority.newInstance(3);
+    priority4 = Priority.newInstance(4);
+    capability = Resource.newInstance(512, 1);
 
     node = nodeReports.get(0).getNodeId().getHost();
     rack = nodeReports.get(0).getRackName();
@@ -193,10 +214,35 @@ public class TestOpportunisticContainerAllocation {
     UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
     appAttempt.getAMRMToken()
         .setService(ClientRMProxy.getAMRMTokenService(conf));
+
+    // start am rm client
+    amClient = (AMRMClientImpl<AMRMClient.ContainerRequest>)AMRMClient
+        .createAMRMClient();
+
+    //setting an instance NMTokenCache
+    amClient.setNMTokenCache(new NMTokenCache());
+    //asserting we are not using the singleton instance cache
+    Assert.assertNotSame(NMTokenCache.getSingleton(),
+        amClient.getNMTokenCache());
+
+    amClient.init(conf);
+    amClient.start();
+
+    amClient.registerApplicationMaster("Host", 10000, "");
   }
 
   @After
   public void cancelApp() throws YarnException, IOException {
+    try {
+      amClient
+          .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+              null);
+    } finally {
+      if (amClient != null &&
+          amClient.getServiceState() == Service.STATE.STARTED) {
+        amClient.stop();
+      }
+    }
     yarnClient.killApplication(attemptId.getApplicationId());
     attemptId = null;
   }
@@ -214,43 +260,254 @@ public class TestOpportunisticContainerAllocation {
   }
 
   @Test(timeout = 60000)
-  public void testAMRMClient() throws YarnException, IOException {
-    AMRMClient<AMRMClient.ContainerRequest> amClient = null;
+  public void testPromotionFromAcquired() throws YarnException, IOException {
+    // setup container request
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+
+    int oppContainersRequestedAny =
+        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+            ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+            .getNumContainers();
+
+    assertEquals(1, oppContainersRequestedAny);
+
+    assertEquals(1, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
+    int iterationsLeft = 50;
+
+    amClient.getNMTokenCache().clearCache();
+    Assert.assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+    HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+    updateMetrics("Before Opp Allocation");
+
+    while (allocatedContainerCount < oppContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      allocatedContainerCount +=
+          allocResponse.getAllocatedContainers().size();
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+          allocatedOpportContainers.put(container.getId(), container);
+          removeCR(container);
+        }
+      }
+
+      for (NMToken token : allocResponse.getNMTokens()) {
+        String nodeID = token.getNodeId().toString();
+        receivedNMTokens.put(nodeID, token.getToken());
+      }
+
+      if (allocatedContainerCount < oppContainersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+    assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
+
+    updateMetrics("After Opp Allocation / Before Promotion");
+
     try {
-      // start am rm client
-      amClient = AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
+      Container c = allocatedOpportContainers.values().iterator().next();
+      amClient.requestContainerUpdate(
+          c, UpdateContainerRequest.newInstance(c.getVersion(),
+              c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+              null, ExecutionType.OPPORTUNISTIC));
+      Assert.fail("Should throw Exception..");
+    } catch (IllegalArgumentException e) {
+      System.out.println("## " + e.getMessage());
+      Assert.assertTrue(e.getMessage().contains(
+          "target should be GUARANTEED and original should be OPPORTUNISTIC"));
+    }
 
-      //setting an instance NMTokenCache
-      amClient.setNMTokenCache(new NMTokenCache());
-      //asserting we are not using the singleton instance cache
-      Assert.assertNotSame(NMTokenCache.getSingleton(),
-          amClient.getNMTokenCache());
+    Container c = allocatedOpportContainers.values().iterator().next();
+    amClient.requestContainerUpdate(
+        c, UpdateContainerRequest.newInstance(c.getVersion(),
+            c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+            null, ExecutionType.GUARANTEED));
+    iterationsLeft = 120;
+    Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+    // do a few iterations to ensure RM is not going to send new containers
+    while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+      // inform RM of rejection
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      // RM did not send new containers because AM does not need any
+      if (allocResponse.getUpdatedContainers() != null) {
+        for (UpdatedContainer updatedContainer : allocResponse
+            .getUpdatedContainers()) {
+          System.out.println("Got update..");
+          updatedContainers.put(updatedContainer.getContainer().getId(),
+              updatedContainer);
+        }
+      }
+      if (iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(100);
+      }
+    }
 
-      amClient.init(conf);
-      amClient.start();
+    updateMetrics("After Promotion");
+
+    assertEquals(1, updatedContainers.size());
+    for (ContainerId cId : allocatedOpportContainers.keySet()) {
+      Container orig = allocatedOpportContainers.get(cId);
+      UpdatedContainer updatedContainer = updatedContainers.get(cId);
+      assertNotNull(updatedContainer);
+      assertEquals(ExecutionType.GUARANTEED,
+          updatedContainer.getContainer().getExecutionType());
+      assertEquals(orig.getResource(),
+          updatedContainer.getContainer().getResource());
+      assertEquals(orig.getNodeId(),
+          updatedContainer.getContainer().getNodeId());
+      assertEquals(orig.getVersion() + 1,
+          updatedContainer.getContainer().getVersion());
+    }
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+    amClient.ask.clear();
+  }
 
-      amClient.registerApplicationMaster("Host", 10000, "");
+  @Test(timeout = 60000)
+  public void testDemotionFromAcquired() throws YarnException, IOException {
+    // setup container request
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
 
-      testOpportunisticAllocation(
-          (AMRMClientImpl<AMRMClient.ContainerRequest>) amClient);
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority3));
 
-      testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
+    int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        .remoteRequest.getNumContainers();
 
-      amClient
-          .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
-              null);
+    assertEquals(1, guarContainersRequestedAny);
 
-    } finally {
-      if (amClient != null &&
-          amClient.getServiceState() == Service.STATE.STARTED) {
-        amClient.stop();
+    assertEquals(1, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    Map<ContainerId, Container> allocatedGuarContainers = new HashMap<>();
+    int iterationsLeft = 50;
+
+    amClient.getNMTokenCache().clearCache();
+    Assert.assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+    HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+    updateMetrics("Before Guar Allocation");
+
+    while (allocatedContainerCount < guarContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      allocatedContainerCount +=
+          allocResponse.getAllocatedContainers().size();
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+          allocatedGuarContainers.put(container.getId(), container);
+          removeCR(container);
+        }
+      }
+
+      for (NMToken token : allocResponse.getNMTokens()) {
+        String nodeID = token.getNodeId().toString();
+        receivedNMTokens.put(nodeID, token.getToken());
+      }
+
+      if (allocatedContainerCount < guarContainersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(guarContainersRequestedAny, allocatedContainerCount);
+    assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
+
+    updateMetrics("After Guar Allocation / Before Demotion");
+
+    try {
+      Container c = allocatedGuarContainers.values().iterator().next();
+      amClient.requestContainerUpdate(
+          c, UpdateContainerRequest.newInstance(c.getVersion(),
+              c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+              null, ExecutionType.GUARANTEED));
+      Assert.fail("Should throw Exception..");
+    } catch (IllegalArgumentException e) {
+      System.out.println("## " + e.getMessage());
+      Assert.assertTrue(e.getMessage().contains(
+          "target should be OPPORTUNISTIC and original should be GUARANTEED"));
+    }
+
+    Container c = allocatedGuarContainers.values().iterator().next();
+    amClient.requestContainerUpdate(
+        c, UpdateContainerRequest.newInstance(c.getVersion(),
+            c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+            null, ExecutionType.OPPORTUNISTIC));
+    iterationsLeft = 120;
+    Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+    // do a few iterations to ensure RM is not going to send new containers
+    while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+      // inform RM of rejection
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      // RM did not send new containers because AM does not need any
+      if (allocResponse.getUpdatedContainers() != null) {
+        for (UpdatedContainer updatedContainer : allocResponse
+            .getUpdatedContainers()) {
+          System.out.println("Got update..");
+          updatedContainers.put(updatedContainer.getContainer().getId(),
+              updatedContainer);
+        }
+      }
+      if (iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(100);
       }
     }
+
+    updateMetrics("After Demotion");
+
+    assertEquals(1, updatedContainers.size());
+    for (ContainerId cId : allocatedGuarContainers.keySet()) {
+      Container orig = allocatedGuarContainers.get(cId);
+      UpdatedContainer updatedContainer = updatedContainers.get(cId);
+      assertNotNull(updatedContainer);
+      assertEquals(ExecutionType.OPPORTUNISTIC,
+          updatedContainer.getContainer().getExecutionType());
+      assertEquals(orig.getResource(),
+          updatedContainer.getContainer().getResource());
+      assertEquals(orig.getNodeId(),
+          updatedContainer.getContainer().getNodeId());
+      assertEquals(orig.getVersion() + 1,
+          updatedContainer.getContainer().getVersion());
+    }
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+    amClient.ask.clear();
   }
 
-  private void testAllocation(
-      final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
-      throws YarnException, IOException {
+  @Test(timeout = 60000)
+  public void testMixedAllocationAndRelease() throws YarnException,
+      IOException {
     // setup container request
     assertEquals(0, amClient.ask.size());
     assertEquals(0, amClient.release.size());
@@ -274,6 +531,28 @@ public class TestOpportunisticContainerAllocation {
             ExecutionTypeRequest.newInstance(
                 ExecutionType.OPPORTUNISTIC, true)));
 
+    int containersRequestedNode = amClient.getTable(0).get(priority,
+        node, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedRack = amClient.getTable(0).get(priority,
+        rack, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedAny = amClient.getTable(0).get(priority,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        .remoteRequest.getNumContainers();
+    int oppContainersRequestedAny =
+        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+            ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+            .getNumContainers();
+
+    assertEquals(4, containersRequestedNode);
+    assertEquals(4, containersRequestedRack);
+    assertEquals(4, containersRequestedAny);
+    assertEquals(2, oppContainersRequestedAny);
+
+    assertEquals(4, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
     amClient.removeContainerRequest(
         new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
     amClient.removeContainerRequest(
@@ -284,16 +563,16 @@ public class TestOpportunisticContainerAllocation {
             ExecutionTypeRequest.newInstance(
                 ExecutionType.OPPORTUNISTIC, true)));
 
-    int containersRequestedNode = amClient.getTable(0).get(priority,
+    containersRequestedNode = amClient.getTable(0).get(priority,
         node, ExecutionType.GUARANTEED, capability).remoteRequest
         .getNumContainers();
-    int containersRequestedRack = amClient.getTable(0).get(priority,
+    containersRequestedRack = amClient.getTable(0).get(priority,
         rack, ExecutionType.GUARANTEED, capability).remoteRequest
         .getNumContainers();
-    int containersRequestedAny = amClient.getTable(0).get(priority,
+    containersRequestedAny = amClient.getTable(0).get(priority,
         ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
         .remoteRequest.getNumContainers();
-    int oppContainersRequestedAny =
+    oppContainersRequestedAny =
         amClient.getTable(0).get(priority2, ResourceRequest.ANY,
             ExecutionType.OPPORTUNISTIC, capability).remoteRequest
             .getNumContainers();
@@ -309,7 +588,7 @@ public class TestOpportunisticContainerAllocation {
     // RM should allocate container within 2 calls to allocate()
     int allocatedContainerCount = 0;
     int allocatedOpportContainerCount = 0;
-    int iterationsLeft = 10;
+    int iterationsLeft = 50;
     Set<ContainerId> releases = new TreeSet<>();
 
     amClient.getNMTokenCache().clearCache();
@@ -324,8 +603,8 @@ public class TestOpportunisticContainerAllocation {
       assertEquals(0, amClient.ask.size());
       assertEquals(0, amClient.release.size());
 
-      allocatedContainerCount += allocResponse.getAllocatedContainers()
-          .size();
+      allocatedContainerCount +=
+          allocResponse.getAllocatedContainers().size();
       for (Container container : allocResponse.getAllocatedContainers()) {
         if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
           allocatedOpportContainerCount++;
@@ -345,9 +624,9 @@ public class TestOpportunisticContainerAllocation {
       }
     }
 
-    assertEquals(allocatedContainerCount,
-        containersRequestedAny + oppContainersRequestedAny);
-    assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny);
+    assertEquals(containersRequestedAny + oppContainersRequestedAny,
+        allocatedContainerCount);
+    assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
     for (ContainerId rejectContainerId : releases) {
       amClient.releaseAssignedContainer(rejectContainerId);
     }
@@ -395,26 +674,25 @@ public class TestOpportunisticContainerAllocation {
   /**
    * Tests allocation with requests comprising only opportunistic containers.
    */
-  private void testOpportunisticAllocation(
-      final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
-      throws YarnException, IOException {
+  @Test(timeout = 60000)
+  public void testOpportunisticAllocation() throws YarnException, IOException {
     // setup container request
     assertEquals(0, amClient.ask.size());
     assertEquals(0, amClient.release.size());
 
     amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
+        new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
             true, null,
             ExecutionTypeRequest.newInstance(
                 ExecutionType.OPPORTUNISTIC, true)));
     amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
+        new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
             true, null,
             ExecutionTypeRequest.newInstance(
                 ExecutionType.OPPORTUNISTIC, true)));
 
     int oppContainersRequestedAny =
-        amClient.getTable(0).get(priority, ResourceRequest.ANY,
+        amClient.getTable(0).get(priority3, ResourceRequest.ANY,
             ExecutionType.OPPORTUNISTIC, capability).remoteRequest
             .getNumContainers();
 
@@ -456,9 +734,43 @@ public class TestOpportunisticContainerAllocation {
       }
     }
 
+    assertEquals(oppContainersRequestedAny, allocatedContainerCount);
     assertEquals(1, receivedNMTokens.values().size());
   }
 
+  private void removeCR(Container container) {
+    List<? extends Collection<AMRMClient.ContainerRequest>>
+        matchingRequests = amClient.getMatchingRequests(container
+            .getPriority(),
+        ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+        container.getResource());
+    Set<AMRMClient.ContainerRequest> toRemove = new HashSet<>();
+    for (Collection<AMRMClient.ContainerRequest> rc : matchingRequests) {
+      for (AMRMClient.ContainerRequest cr : rc) {
+        toRemove.add(cr);
+      }
+    }
+    for (AMRMClient.ContainerRequest cr : toRemove) {
+      amClient.removeContainerRequest(cr);
+    }
+  }
+
+  private void updateMetrics(String msg) {
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler)yarnCluster.getResourceManager()
+            .getResourceScheduler();
+    availMB = scheduler.getRootQueueMetrics().getAvailableMB();
+    availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
+    allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
+    allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
+    System.out.println("## METRICS (" + msg + ")==>");
+    System.out.println(" : availMB=" + availMB + ", " +
+        "availVCores=" +availVCores + ", " +
+        "allocMB=" + allocMB + ", " +
+        "allocVCores=" + allocVCores + ", ");
+    System.out.println("<== ##");
+  }
+
   private void sleep(int sleepTime) {
     try {
       Thread.sleep(sleepTime);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
index fb6c1a7..8ff9d9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
@@ -74,6 +74,22 @@ public class UpdateContainerErrorPBImpl extends UpdateContainerError {
   }
 
   @Override
+  public int getCurrentContainerVersion() {
+    YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (!p.hasCurrentContainerVersion()) {
+      return 0;
+    }
+    return p.getCurrentContainerVersion();
+  }
+
+  @Override
+  public void setCurrentContainerVersion(int containerVersion) {
+    maybeInitBuilder();
+    builder.setCurrentContainerVersion(containerVersion);
+  }
+
+  @Override
   public UpdateContainerRequest getUpdateContainerRequest() {
     YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p = viaProto ? proto
         : builder;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 94bfd58..224a1da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -188,19 +188,25 @@ public class RMServerUtils {
           }
         }
       }
-      checkAndcreateUpdateError(updateErrors, updateReq, msg);
+      checkAndcreateUpdateError(updateErrors, updateReq, rmContainer, msg);
     }
     return updateRequests;
   }
 
   private static void checkAndcreateUpdateError(
       List<UpdateContainerError> errors, UpdateContainerRequest updateReq,
-      String msg) {
+      RMContainer rmContainer, String msg) {
     if (msg != null) {
       UpdateContainerError updateError = RECORD_FACTORY
           .newRecordInstance(UpdateContainerError.class);
       updateError.setReason(msg);
       updateError.setUpdateContainerRequest(updateReq);
+      if (rmContainer != null) {
+        updateError.setCurrentContainerVersion(
+            rmContainer.getContainer().getVersion());
+      } else {
+        updateError.setCurrentContainerVersion(-1);
+      }
       errors.add(updateError);
     }
   }
@@ -216,9 +222,7 @@ public class RMServerUtils {
     // version
     if (msg == null && updateReq.getContainerVersion() !=
         rmContainer.getContainer().getVersion()) {
-      msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
-          + updateReq.getContainerVersion() + "|"
-          + rmContainer.getContainer().getVersion();
+      msg = INCORRECT_CONTAINER_VERSION_ERROR;
     }
     // No more than 1 container update per request.
     if (msg == null &&

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 641ef64..b083642 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -251,8 +251,11 @@ public class TestOpportunisticContainerAllocatorAMService {
 
     Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
     Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
-    Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0",
+    Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
         allocateResponse.getUpdateErrors().get(0).getReason());
+    Assert.assertEquals(0,
+        allocateResponse.getUpdateErrors().get(0)
+            .getCurrentContainerVersion());
     Assert.assertEquals(container.getId(),
         allocateResponse.getUpdateErrors().get(0)
             .getUpdateContainerRequest().getContainerId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
index c5829cf..74cecf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -275,8 +275,10 @@ public class TestIncreaseAllocationExpirer {
             Resources.createResource(5 * GB), null)));
     List<UpdateContainerError> updateErrors = response.getUpdateErrors();
     Assert.assertEquals(1, updateErrors.size());
-    Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|0|1",
+    Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
         updateErrors.get(0).getReason());
+    Assert.assertEquals(1,
+        updateErrors.get(0).getCurrentContainerVersion());
 
     // am1 asks to change containerId2 from 3GB to 5GB
     am1.sendContainerResizingRequest(Collections.singletonList(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org