You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/05/21 16:26:23 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-777] Remove container request after container allocation

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 027321d  [GOBBLIN-777] Remove container request after container allocation
027321d is described below

commit 027321d6c76390d56be5dae0876cd5e6ca4118f5
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Tue May 21 09:25:48 2019 -0700

    [GOBBLIN-777] Remove container request after container allocation
    
    Closes #2641 from
    htran1/yarn_remove_request_on_allocation
---
 .../java/org/apache/gobblin/yarn/YarnService.java  | 27 ++++++++++++++++++++++
 .../org/apache/gobblin/yarn/YarnServiceTest.java   | 18 +++++++++++++++
 2 files changed, 45 insertions(+)

diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index f3d4e53..ce53075 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
@@ -133,6 +135,8 @@ public class YarnService extends AbstractIdleService {
   private final Optional<GobblinMetrics> gobblinMetrics;
   private final Optional<EventSubmitter> eventSubmitter;
 
+  @VisibleForTesting
+  @Getter(AccessLevel.PROTECTED)
   private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
   private final NMClientAsync nmClientAsync;
   private final ExecutorService containerLaunchExecutor;
@@ -702,6 +706,29 @@ public class YarnService extends AbstractIdleService {
         final String finalInstanceName = instanceName;
         containerMap.put(container.getId(), new AbstractMap.SimpleImmutableEntry<>(container, finalInstanceName));
 
+        // Find matching requests and remove the request to reduce the chance that a subsequent request
+        // will request extra containers. YARN does not have a delta request API and the requests are not
+        // cleaned up automatically.
+        // Try finding a match first with the host as the resource name then fall back to any resource match.
+        // See YARN-1902.
+        List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests = amrmClientAsync
+            .getMatchingRequests(container.getPriority(), container.getNodeHttpAddress(), container.getResource());
+
+        if (matchingRequests.isEmpty()) {
+          LOGGER.debug("Matching request by host {} not found", container.getNodeHttpAddress());
+
+          matchingRequests = amrmClientAsync
+              .getMatchingRequests(container.getPriority(), ResourceRequest.ANY, container.getResource());
+        }
+
+        if (!matchingRequests.isEmpty()) {
+          AMRMClient.ContainerRequest firstMatchingContainerRequest = matchingRequests.get(0).iterator().next();
+          LOGGER.debug("Found matching requests {}, removing first matching request {}",
+              matchingRequests, firstMatchingContainerRequest);
+
+          amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
+        }
+
         containerLaunchExecutor.submit(new Runnable() {
           @Override
           public void run() {
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 29cd68d..3771994 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -28,8 +28,10 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
@@ -46,7 +48,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -197,8 +201,12 @@ public class YarnServiceTest {
   public void testScaleUp() {
     this.yarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET);
 
+    Assert.assertFalse(this.yarnService.getMatchingRequestsList(64, 1).isEmpty());
     Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
     Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
+
+    // container request list that had entries earlier should now be empty
+    Assert.assertEquals(this.yarnService.getMatchingRequestsList(64, 1).size(), 0);
   }
 
   @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleUp")
@@ -285,6 +293,16 @@ public class YarnServiceTest {
     }
 
     /**
+     * Get the list of matching container requests for the specified resource memory and cores.
+     */
+    public List<? extends Collection<AMRMClient.ContainerRequest>> getMatchingRequestsList(int memory, int cores) {
+      Resource resource = Resource.newInstance(memory, cores);
+      Priority priority = Priority.newInstance(0);
+
+      return getAmrmClientAsync().getMatchingRequests(priority, ResourceRequest.ANY, resource);
+    }
+
+    /**
      * Wait to reach the expected count.
      *
      * @param expectedCount the expected count