You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/17 06:47:25 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-916] Make ContainerLaunchContext instantiation in YarnService more efficient

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new adb0efc  [GOBBLIN-916] Make ContainerLaunchContext instantiation in YarnService more efficient
adb0efc is described below

commit adb0efcc20a306d16b2f59d89c189539bd20dc2d
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Thu Jan 16 22:47:19 2020 -0800

    [GOBBLIN-916] Make ContainerLaunchContext instantiation in YarnService more efficient
    
    Closes #2770 from ZihanLi58/GOBBLIN-916
---
 .../hive/metastore/HiveMetaStoreBasedRegister.java |   2 +-
 .../java/org/apache/gobblin/yarn/YarnService.java  |  18 ++-
 .../org/apache/gobblin/yarn/YarnServiceTest.java   |   3 +-
 ...est.java => YarnServiceTestWithExpiration.java} | 174 ++++++---------------
 4 files changed, 60 insertions(+), 137 deletions(-)

diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 15f5982..02d38dc 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -133,7 +133,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
    * when the first time a table/database is loaded into the cache, whether they existed on the remote hiveMetaStore side.
    */
   CacheLoader<String, Boolean> cacheLoader = new CacheLoader<String, Boolean>() {
-  @Override
+    @Override
     public Boolean load(String key) throws Exception {
       return true;
     }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 1d67b9c..4910a5f 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -32,10 +32,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -217,7 +216,7 @@ public class YarnService extends AbstractIdleService {
     this.amrmClientAsync = closer.register(
         AMRMClientAsync.createAMRMClientAsync(1000, new AMRMClientCallbackHandler()));
     this.amrmClientAsync.init(this.yarnConfiguration);
-    this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(new NMClientCallbackHandler()));
+    this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler()));
     this.nmClientAsync.init(this.yarnConfiguration);
 
     this.initialContainers = config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
@@ -231,7 +230,7 @@ public class YarnService extends AbstractIdleService {
         Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) :
         Optional.<String>absent();
 
-    this.containerLaunchExecutor = Executors.newFixedThreadPool(10,
+    this.containerLaunchExecutor = ScalingThreadPoolExecutor.newScalingThreadPool(5, Integer.MAX_VALUE, 0L,
         ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("ContainerLaunchExecutor")));
 
     this.tokens = getSecurityTokens();
@@ -282,6 +281,10 @@ public class YarnService extends AbstractIdleService {
     }));
   }
 
+  protected NMClientCallbackHandler getNMClientCallbackHandler() {
+    return new NMClientCallbackHandler();
+  }
+
   @SuppressWarnings("unused")
   @Subscribe
   public void handleContainerShutdownRequest(ContainerShutdownRequest containerShutdownRequest) {
@@ -608,8 +611,9 @@ public class YarnService extends AbstractIdleService {
    * preempted by the ResourceManager, or 4) the container gets stopped by the ApplicationMaster.
    * A replacement container is needed in all but the last case.
    */
-  private void handleContainerCompletion(ContainerStatus containerStatus) {
+  protected void handleContainerCompletion(ContainerStatus containerStatus) {
     Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId());
+
     String completedInstanceName = completedContainerEntry.getValue();
 
     LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d",
@@ -796,7 +800,7 @@ public class YarnService extends AbstractIdleService {
   /**
    * A custom implementation of {@link NMClientAsync.CallbackHandler}.
    */
-  private class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
+   class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
 
     @Override
     public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
@@ -826,7 +830,6 @@ public class YarnService extends AbstractIdleService {
       }
 
       LOGGER.info(String.format("Container %s has been stopped", containerId));
-      containerMap.remove(containerId);
       if (containerMap.isEmpty()) {
         synchronized (allContainersStopped) {
           allContainersStopped.notify();
@@ -843,7 +846,6 @@ public class YarnService extends AbstractIdleService {
       }
 
       LOGGER.error(String.format("Failed to start container %s due to error %s", containerId, t));
-      containerMap.remove(containerId);
     }
 
     @Override
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 3771994..2eb032f 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -279,8 +279,7 @@ public class YarnServiceTest {
     Assert.assertTrue(command.contains("-Xmx1628"));
   }
 
-
-  private static class TestYarnService extends YarnService {
+   static class TestYarnService extends YarnService {
     public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
       super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
similarity index 58%
copy from gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
copy to gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
index 3771994..564647c 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.yarn;
 
 import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
 import com.google.common.eventbus.EventBus;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -68,13 +70,13 @@ import org.testng.annotations.Test;
 /**
  * Tests for {@link YarnService}.
  */
-@Test(groups = {"gobblin.yarn", "disabledOnTravis"}, singleThreaded=true)
-public class YarnServiceTest {
+@Test(groups = {"gobblin.yarn", "disabledOnTravis"})
+public class YarnServiceTestWithExpiration {
   final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
 
   private YarnClient yarnClient;
   private MiniYARNCluster yarnCluster;
-  private TestYarnService yarnService;
+  private TestExpiredYarnService expiredYarnService;
   private Config config;
   private YarnConfiguration clusterConf;
   private ApplicationId applicationId;
@@ -106,6 +108,7 @@ public class YarnServiceTest {
     this.clusterConf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "100");
     this.clusterConf.set(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, "10000");
     this.clusterConf.set(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS, "60000");
+    this.clusterConf.set(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, "1000");
 
     this.yarnCluster =
         this.closer.register(new MiniYARNCluster("YarnServiceTestCluster", 4, 1,
@@ -135,11 +138,11 @@ public class YarnServiceTest {
     startApp();
 
     // create and start the test yarn service
-    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+    this.expiredYarnService = new TestExpiredYarnService(this.config, "testApp", "appId",
         this.clusterConf,
         FileSystem.getLocal(new Configuration()), this.eventBus);
 
-   this.yarnService.startUp();
+    this.expiredYarnService.startUp();
   }
 
   private void startApp() throws Exception {
@@ -187,149 +190,68 @@ public class YarnServiceTest {
   public void tearDown() throws IOException, TimeoutException, YarnException {
     try {
       this.yarnClient.killApplication(this.applicationAttemptId.getApplicationId());
-      this.yarnService.shutDown();
+      this.expiredYarnService.shutDown();
+      Assert.assertEquals(this.expiredYarnService.getContainerMap().size(), 0);
     } finally {
       this.closer.close();
     }
   }
 
   /**
-   * Test that the dynamic config is added to the config specified when the {@link GobblinApplicationMaster}
-   * is instantiated.
+   * Test that the yarn service can handle onStartContainerError right
    */
-  @Test(groups = {"gobblin.yarn", "disabledOnTravis"})
-  public void testScaleUp() {
-    this.yarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET);
-
-    Assert.assertFalse(this.yarnService.getMatchingRequestsList(64, 1).isEmpty());
-    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
-    Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
 
-    // container request list that had entries earlier should now be empty
-    Assert.assertEquals(this.yarnService.getMatchingRequestsList(64, 1).size(), 0);
-  }
+  @Test(groups = {"gobblin.yarn", "disabledOnTravis"})
+  public void testStartError() throws Exception{
+    this.expiredYarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET);
 
-  @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleUp")
-  public void testScaleDownWithInUseInstances() {
-    Set<String> inUseInstances = new HashSet<>();
+    Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64, 1).isEmpty());
+    Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(), 10);
 
-    for (int i = 1; i <= 8; i++) {
-      inUseInstances.add("GobblinYarnTaskRunner_" + i);
+    try {
+      Thread.sleep(20000);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
     }
-
-    this.yarnService.requestTargetNumberOfContainers(6, inUseInstances);
-
-    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6);
-
-    // will only be able to shrink to 8
-    Assert.assertTrue(this.yarnService.waitForContainerCount(8, 60000));
-
-    // will not be able to shrink to 6 due to 8 in-use instances
-    Assert.assertFalse(this.yarnService.waitForContainerCount(6, 10000));
-
-  }
-
-  @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleDownWithInUseInstances")
-  public void testScaleDown() throws Exception {
-    this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET);
-
-    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4);
-    Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000));
-  }
-
-  // Keep this test last since it interferes with the container counts in the prior tests.
-  @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleDown")
-  public void testReleasedContainerCache() throws Exception {
-    Config modifiedConfig = this.config
-        .withValue(GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, ConfigValueFactory.fromAnyRef("2"));
-    TestYarnService yarnService =
-        new TestYarnService(modifiedConfig, "testApp1", "appId1",
-            this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus);
-
-    ContainerId containerId1 = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),
-        0), 0);
-
-    yarnService.getReleasedContainerCache().put(containerId1, "");
-
-    Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1) != null);
-
-    // give some time for element to expire
-    Thread.sleep(4000);
-    Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1) == null);
+    //Since it may retry to request the container and start again, so the number may lager than 10
+    Assert.assertTrue(this.expiredYarnService.completedContainers.size() >= 10);
+    Assert.assertTrue(this.expiredYarnService.startErrorContainers.size() >= 10);
   }
 
-  @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testReleasedContainerCache")
-  public void testBuildContainerCommand() throws Exception {
-    Config modifiedConfig = this.config
-        .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef("10"))
-        .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef("0.8"));
-
-    TestYarnService yarnService =
-        new TestYarnService(modifiedConfig, "testApp2", "appId2",
-            this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus);
-
-    ContainerId containerId = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),
-        0), 0);
-    Resource resource = Resource.newInstance(2048, 1);
-    Container container = Container.newInstance(containerId, null, null, resource, null, null);
-
-    String command = yarnService.buildContainerCommand(container, "helixInstance1");
-
-    // 1628 is from 2048 * 0.8 - 10
-    Assert.assertTrue(command.contains("-Xmx1628"));
-  }
-
-
-  private static class TestYarnService extends YarnService {
-    public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+  private static class TestExpiredYarnService extends YarnServiceTest.TestYarnService {
+    public HashSet<ContainerId> startErrorContainers = new HashSet<>();
+    public HashSet<ContainerStatus> completedContainers = new HashSet<>();
+    public TestExpiredYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
       super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
     }
 
-    protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)
-        throws IOException {
-      return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), Collections.emptyMap(),
-              Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, Collections.emptyMap());
+    @Override
+    protected NMClientCallbackHandler getNMClientCallbackHandler() {
+      return new TestNMClientCallbackHandler();
     }
 
-    /**
-     * Get the list of matching container requests for the specified resource memory and cores.
-     */
-    public List<? extends Collection<AMRMClient.ContainerRequest>> getMatchingRequestsList(int memory, int cores) {
-      Resource resource = Resource.newInstance(memory, cores);
-      Priority priority = Priority.newInstance(0);
-
-      return getAmrmClientAsync().getMatchingRequests(priority, ResourceRequest.ANY, resource);
+    @Override
+    protected void handleContainerCompletion(ContainerStatus containerStatus){
+      super.handleContainerCompletion(containerStatus);
+      completedContainers.add(containerStatus);
     }
 
-    /**
-     * Wait to reach the expected count.
-     *
-     * @param expectedCount the expected count
-     * @param waitMillis amount of time in milliseconds to wait
-     * @return true if the count was reached within the allowed wait time
-     */
-    public boolean waitForContainerCount(int expectedCount, int waitMillis) {
-      final int waitInterval = 1000;
-      int waitedMillis = 0;
-      boolean success = false;
-
-      while (waitedMillis < waitMillis) {
-        try {
-          Thread.sleep(waitInterval);
-          waitedMillis += waitInterval;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          break;
-        }
-
-        if (expectedCount == getContainerMap().size()) {
-          success = true;
-          break;
-        }
+    protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)
+        throws IOException {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), Collections.emptyMap(),
+          Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, Collections.emptyMap());
+    }
+    private class TestNMClientCallbackHandler extends YarnService.NMClientCallbackHandler {
+      @Override
+      public void onStartContainerError(ContainerId containerId, Throwable t) {
+        startErrorContainers.add(containerId);
       }
-
-      return success;
     }
   }
 }