You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/17 06:47:25 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-916] Make
ContainerLaunchContext instantiation in YarnService more efficient
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new adb0efc [GOBBLIN-916] Make ContainerLaunchContext instantiation in YarnService more efficient
adb0efc is described below
commit adb0efcc20a306d16b2f59d89c189539bd20dc2d
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Thu Jan 16 22:47:19 2020 -0800
[GOBBLIN-916] Make ContainerLaunchContext instantiation in YarnService more efficient
Closes #2770 from ZihanLi58/GOBBLIN-916
---
.../hive/metastore/HiveMetaStoreBasedRegister.java | 2 +-
.../java/org/apache/gobblin/yarn/YarnService.java | 18 ++-
.../org/apache/gobblin/yarn/YarnServiceTest.java | 3 +-
...est.java => YarnServiceTestWithExpiration.java} | 174 ++++++---------------
4 files changed, 60 insertions(+), 137 deletions(-)
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 15f5982..02d38dc 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -133,7 +133,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
* when the first time a table/database is loaded into the cache, whether they existed on the remote hiveMetaStore side.
*/
CacheLoader<String, Boolean> cacheLoader = new CacheLoader<String, Boolean>() {
- @Override
+ @Override
public Boolean load(String key) throws Exception {
return true;
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 1d67b9c..4910a5f 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -32,10 +32,9 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -217,7 +216,7 @@ public class YarnService extends AbstractIdleService {
this.amrmClientAsync = closer.register(
AMRMClientAsync.createAMRMClientAsync(1000, new AMRMClientCallbackHandler()));
this.amrmClientAsync.init(this.yarnConfiguration);
- this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(new NMClientCallbackHandler()));
+ this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler()));
this.nmClientAsync.init(this.yarnConfiguration);
this.initialContainers = config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
@@ -231,7 +230,7 @@ public class YarnService extends AbstractIdleService {
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) :
Optional.<String>absent();
- this.containerLaunchExecutor = Executors.newFixedThreadPool(10,
+ this.containerLaunchExecutor = ScalingThreadPoolExecutor.newScalingThreadPool(5, Integer.MAX_VALUE, 0L,
ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("ContainerLaunchExecutor")));
this.tokens = getSecurityTokens();
@@ -282,6 +281,10 @@ public class YarnService extends AbstractIdleService {
}));
}
+ protected NMClientCallbackHandler getNMClientCallbackHandler() {
+ return new NMClientCallbackHandler();
+ }
+
@SuppressWarnings("unused")
@Subscribe
public void handleContainerShutdownRequest(ContainerShutdownRequest containerShutdownRequest) {
@@ -608,8 +611,9 @@ public class YarnService extends AbstractIdleService {
* preempted by the ResourceManager, or 4) the container gets stopped by the ApplicationMaster.
* A replacement container is needed in all but the last case.
*/
- private void handleContainerCompletion(ContainerStatus containerStatus) {
+ protected void handleContainerCompletion(ContainerStatus containerStatus) {
Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId());
+
String completedInstanceName = completedContainerEntry.getValue();
LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d",
@@ -796,7 +800,7 @@ public class YarnService extends AbstractIdleService {
/**
* A custom implementation of {@link NMClientAsync.CallbackHandler}.
*/
- private class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
+ class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
@@ -826,7 +830,6 @@ public class YarnService extends AbstractIdleService {
}
LOGGER.info(String.format("Container %s has been stopped", containerId));
- containerMap.remove(containerId);
if (containerMap.isEmpty()) {
synchronized (allContainersStopped) {
allContainersStopped.notify();
@@ -843,7 +846,6 @@ public class YarnService extends AbstractIdleService {
}
LOGGER.error(String.format("Failed to start container %s due to error %s", containerId, t));
- containerMap.remove(containerId);
}
@Override
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 3771994..2eb032f 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -279,8 +279,7 @@ public class YarnServiceTest {
Assert.assertTrue(command.contains("-Xmx1628"));
}
-
- private static class TestYarnService extends YarnService {
+ static class TestYarnService extends YarnService {
public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
similarity index 58%
copy from gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
copy to gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
index 3771994..564647c 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.yarn;
import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
import com.google.common.eventbus.EventBus;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -68,13 +70,13 @@ import org.testng.annotations.Test;
/**
* Tests for {@link YarnService}.
*/
-@Test(groups = {"gobblin.yarn", "disabledOnTravis"}, singleThreaded=true)
-public class YarnServiceTest {
+@Test(groups = {"gobblin.yarn", "disabledOnTravis"})
+public class YarnServiceTestWithExpiration {
final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
private YarnClient yarnClient;
private MiniYARNCluster yarnCluster;
- private TestYarnService yarnService;
+ private TestExpiredYarnService expiredYarnService;
private Config config;
private YarnConfiguration clusterConf;
private ApplicationId applicationId;
@@ -106,6 +108,7 @@ public class YarnServiceTest {
this.clusterConf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "100");
this.clusterConf.set(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, "10000");
this.clusterConf.set(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS, "60000");
+ this.clusterConf.set(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, "1000");
this.yarnCluster =
this.closer.register(new MiniYARNCluster("YarnServiceTestCluster", 4, 1,
@@ -135,11 +138,11 @@ public class YarnServiceTest {
startApp();
// create and start the test yarn service
- this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+ this.expiredYarnService = new TestExpiredYarnService(this.config, "testApp", "appId",
this.clusterConf,
FileSystem.getLocal(new Configuration()), this.eventBus);
- this.yarnService.startUp();
+ this.expiredYarnService.startUp();
}
private void startApp() throws Exception {
@@ -187,149 +190,68 @@ public class YarnServiceTest {
public void tearDown() throws IOException, TimeoutException, YarnException {
try {
this.yarnClient.killApplication(this.applicationAttemptId.getApplicationId());
- this.yarnService.shutDown();
+ this.expiredYarnService.shutDown();
+ Assert.assertEquals(this.expiredYarnService.getContainerMap().size(), 0);
} finally {
this.closer.close();
}
}
/**
- * Test that the dynamic config is added to the config specified when the {@link GobblinApplicationMaster}
- * is instantiated.
+ * Test that the yarn service can handle onStartContainerError right
*/
- @Test(groups = {"gobblin.yarn", "disabledOnTravis"})
- public void testScaleUp() {
- this.yarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET);
-
- Assert.assertFalse(this.yarnService.getMatchingRequestsList(64, 1).isEmpty());
- Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
- Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
- // container request list that had entries earlier should now be empty
- Assert.assertEquals(this.yarnService.getMatchingRequestsList(64, 1).size(), 0);
- }
+ @Test(groups = {"gobblin.yarn", "disabledOnTravis"})
+ public void testStartError() throws Exception{
+ this.expiredYarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET);
- @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleUp")
- public void testScaleDownWithInUseInstances() {
- Set<String> inUseInstances = new HashSet<>();
+ Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64, 1).isEmpty());
+ Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(), 10);
- for (int i = 1; i <= 8; i++) {
- inUseInstances.add("GobblinYarnTaskRunner_" + i);
+ try {
+ Thread.sleep(20000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
-
- this.yarnService.requestTargetNumberOfContainers(6, inUseInstances);
-
- Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6);
-
- // will only be able to shrink to 8
- Assert.assertTrue(this.yarnService.waitForContainerCount(8, 60000));
-
- // will not be able to shrink to 6 due to 8 in-use instances
- Assert.assertFalse(this.yarnService.waitForContainerCount(6, 10000));
-
- }
-
- @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleDownWithInUseInstances")
- public void testScaleDown() throws Exception {
- this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET);
-
- Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4);
- Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000));
- }
-
- // Keep this test last since it interferes with the container counts in the prior tests.
- @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleDown")
- public void testReleasedContainerCache() throws Exception {
- Config modifiedConfig = this.config
- .withValue(GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, ConfigValueFactory.fromAnyRef("2"));
- TestYarnService yarnService =
- new TestYarnService(modifiedConfig, "testApp1", "appId1",
- this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus);
-
- ContainerId containerId1 = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),
- 0), 0);
-
- yarnService.getReleasedContainerCache().put(containerId1, "");
-
- Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1) != null);
-
- // give some time for element to expire
- Thread.sleep(4000);
- Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1) == null);
+ //Since it may retry to request the container and start again, so the number may lager than 10
+ Assert.assertTrue(this.expiredYarnService.completedContainers.size() >= 10);
+ Assert.assertTrue(this.expiredYarnService.startErrorContainers.size() >= 10);
}
- @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testReleasedContainerCache")
- public void testBuildContainerCommand() throws Exception {
- Config modifiedConfig = this.config
- .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef("10"))
- .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef("0.8"));
-
- TestYarnService yarnService =
- new TestYarnService(modifiedConfig, "testApp2", "appId2",
- this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus);
-
- ContainerId containerId = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),
- 0), 0);
- Resource resource = Resource.newInstance(2048, 1);
- Container container = Container.newInstance(containerId, null, null, resource, null, null);
-
- String command = yarnService.buildContainerCommand(container, "helixInstance1");
-
- // 1628 is from 2048 * 0.8 - 10
- Assert.assertTrue(command.contains("-Xmx1628"));
- }
-
-
- private static class TestYarnService extends YarnService {
- public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+ private static class TestExpiredYarnService extends YarnServiceTest.TestYarnService {
+ public HashSet<ContainerId> startErrorContainers = new HashSet<>();
+ public HashSet<ContainerStatus> completedContainers = new HashSet<>();
+ public TestExpiredYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
}
- protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)
- throws IOException {
- return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), Collections.emptyMap(),
- Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, Collections.emptyMap());
+ @Override
+ protected NMClientCallbackHandler getNMClientCallbackHandler() {
+ return new TestNMClientCallbackHandler();
}
- /**
- * Get the list of matching container requests for the specified resource memory and cores.
- */
- public List<? extends Collection<AMRMClient.ContainerRequest>> getMatchingRequestsList(int memory, int cores) {
- Resource resource = Resource.newInstance(memory, cores);
- Priority priority = Priority.newInstance(0);
-
- return getAmrmClientAsync().getMatchingRequests(priority, ResourceRequest.ANY, resource);
+ @Override
+ protected void handleContainerCompletion(ContainerStatus containerStatus){
+ super.handleContainerCompletion(containerStatus);
+ completedContainers.add(containerStatus);
}
- /**
- * Wait to reach the expected count.
- *
- * @param expectedCount the expected count
- * @param waitMillis amount of time in milliseconds to wait
- * @return true if the count was reached within the allowed wait time
- */
- public boolean waitForContainerCount(int expectedCount, int waitMillis) {
- final int waitInterval = 1000;
- int waitedMillis = 0;
- boolean success = false;
-
- while (waitedMillis < waitMillis) {
- try {
- Thread.sleep(waitInterval);
- waitedMillis += waitInterval;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
-
- if (expectedCount == getContainerMap().size()) {
- success = true;
- break;
- }
+ protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)
+ throws IOException {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), Collections.emptyMap(),
+ Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, Collections.emptyMap());
+ }
+ private class TestNMClientCallbackHandler extends YarnService.NMClientCallbackHandler {
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ startErrorContainers.add(containerId);
}
-
- return success;
}
}
}