You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/05/21 16:26:23 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-777] Remove
container request after container allocation
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 027321d [GOBBLIN-777] Remove container request after container allocation
027321d is described below
commit 027321d6c76390d56be5dae0876cd5e6ca4118f5
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Tue May 21 09:25:48 2019 -0700
[GOBBLIN-777] Remove container request after container allocation
Closes #2641 from
htran1/yarn_remove_request_on_allocation
---
.../java/org/apache/gobblin/yarn/YarnService.java | 27 ++++++++++++++++++++++
.../org/apache/gobblin/yarn/YarnServiceTest.java | 18 +++++++++++++++
2 files changed, 45 insertions(+)
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index f3d4e53..ce53075 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
@@ -133,6 +135,8 @@ public class YarnService extends AbstractIdleService {
private final Optional<GobblinMetrics> gobblinMetrics;
private final Optional<EventSubmitter> eventSubmitter;
+ @VisibleForTesting
+ @Getter(AccessLevel.PROTECTED)
private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
private final NMClientAsync nmClientAsync;
private final ExecutorService containerLaunchExecutor;
@@ -702,6 +706,29 @@ public class YarnService extends AbstractIdleService {
final String finalInstanceName = instanceName;
containerMap.put(container.getId(), new AbstractMap.SimpleImmutableEntry<>(container, finalInstanceName));
+ // Find matching requests and remove the request to reduce the chance that a subsequent request
+ // will request extra containers. YARN does not have a delta request API and the requests are not
+ // cleaned up automatically.
+ // Try finding a match first with the host as the resource name then fall back to any resource match.
+ // See YARN-1902.
+ List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests = amrmClientAsync
+ .getMatchingRequests(container.getPriority(), container.getNodeHttpAddress(), container.getResource());
+
+ if (matchingRequests.isEmpty()) {
+ LOGGER.debug("Matching request by host {} not found", container.getNodeHttpAddress());
+
+ matchingRequests = amrmClientAsync
+ .getMatchingRequests(container.getPriority(), ResourceRequest.ANY, container.getResource());
+ }
+
+ if (!matchingRequests.isEmpty()) {
+ AMRMClient.ContainerRequest firstMatchingContainerRequest = matchingRequests.get(0).iterator().next();
+ LOGGER.debug("Found matching requests {}, removing first matching request {}",
+ matchingRequests, firstMatchingContainerRequest);
+
+ amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
+ }
+
containerLaunchExecutor.submit(new Runnable() {
@Override
public void run() {
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 29cd68d..3771994 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -28,8 +28,10 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
@@ -46,7 +48,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -197,8 +201,12 @@ public class YarnServiceTest {
public void testScaleUp() {
this.yarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET);
+ Assert.assertFalse(this.yarnService.getMatchingRequestsList(64, 1).isEmpty());
Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
+
+ // container request list that had entries earlier should now be empty
+ Assert.assertEquals(this.yarnService.getMatchingRequestsList(64, 1).size(), 0);
}
@Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleUp")
@@ -285,6 +293,16 @@ public class YarnServiceTest {
}
/**
+ * Get the list of matching container requests for the specified resource memory and cores.
+ */
+ public List<? extends Collection<AMRMClient.ContainerRequest>> getMatchingRequestsList(int memory, int cores) {
+ Resource resource = Resource.newInstance(memory, cores);
+ Priority priority = Priority.newInstance(0);
+
+ return getAmrmClientAsync().getMatchingRequests(priority, ResourceRequest.ANY, resource);
+ }
+
+ /**
* Wait to reach the expected count.
*
* @param expectedCount the expected count