You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/22 03:45:08 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-743] Initialize Gobblin application master services with dyn…

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 074ebc9  [GOBBLIN-743] Initialize Gobblin application master services with dyn…
074ebc9 is described below

commit 074ebc9e88ede3b3693fe88d7b105449ebf906e0
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Sun Apr 21 20:43:18 2019 -0700

    [GOBBLIN-743] Initialize Gobblin application master services with dyn…
    
    Closes #2610 from htran1/app_master_dynamic_config
---
 .../gobblin/cluster/GobblinClusterManager.java     |  2 +-
 .../gobblin/yarn/GobblinApplicationMaster.java     |  6 +-
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   | 87 ++++++++++++++++++++++
 3 files changed, 91 insertions(+), 4 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 93a6712..a4658cc 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -136,7 +136,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
   private JobConfigurationManager jobConfigurationManager;
 
   private final String clusterName;
-  private final Config config;
+  protected final Config config;
 
   public GobblinClusterManager(String clusterName, String applicationId, Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index fa7ee54..c4ee07a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -79,15 +79,15 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
     GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
     if (gobblinYarnLogSource.isLogSourcePresent()) {
       this.applicationLauncher
-          .addService(gobblinYarnLogSource.buildLogCopier(config, containerId, this.fs, this.appWorkDir));
+          .addService(gobblinYarnLogSource.buildLogCopier(this.config, containerId, this.fs, this.appWorkDir));
     }
 
     this.applicationLauncher
-        .addService(buildYarnService(config, applicationName, this.applicationId, yarnConfiguration, this.fs));
+        .addService(buildYarnService(this.config, applicationName, this.applicationId, yarnConfiguration, this.fs));
 
     if (UserGroupInformation.isSecurityEnabled()) {
       LOGGER.info("Adding YarnContainerSecurityManager since security is enabled");
-      this.applicationLauncher.addService(buildYarnContainerSecurityManager(config, this.fs));
+      this.applicationLauncher.addService(buildYarnContainerSecurityManager(this.config, this.fs));
     }
   }
 
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 1330bc6..f409f5f 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.gobblin.yarn;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Closer;
+import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
@@ -31,8 +34,11 @@ import java.lang.reflect.Field;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeoutException;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
@@ -41,8 +47,14 @@ import org.apache.gobblin.cluster.HelixMessageTestBase;
 import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.cluster.TestHelper;
 import org.apache.gobblin.cluster.TestShutdownMessageHandlerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.testing.AssertWithBackoff;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -278,4 +290,79 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
     Assert.assertEquals(message.getMsgType(), GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE);
     Assert.assertEquals(message.getMsgSubType(), HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString());
   }
+
+  /**
+   * Test that the dynamic config is added to the config specified when the {@link GobblinApplicationMaster}
+   * is instantiated.
+   */
+  @Test
+  public void testDynamicConfig() throws Exception {
+    Config config = this.config.withFallback(
+        ConfigFactory.parseMap(
+        ImmutableMap.of(ConfigurationKeys.DYNAMIC_CONFIG_GENERATOR_CLASS_KEY,
+            TestDynamicConfigGenerator.class.getName())));
+
+    ContainerId containerId = ContainerId.newInstance(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), 0);
+    TestApplicationMaster
+        appMaster = new TestApplicationMaster("testApp", containerId, config,
+        new YarnConfiguration());
+
+    Assert.assertEquals(appMaster.getConfig().getString("dynamicKey1"), "dynamicValue1");
+    Assert.assertEquals(appMaster.getConfig().getString(ConfigurationKeys.DYNAMIC_CONFIG_GENERATOR_CLASS_KEY),
+        TestDynamicConfigGenerator.class.getName());
+
+    ServiceBasedAppLauncher appLauncher = appMaster.getAppLauncher();
+    Field servicesField = ServiceBasedAppLauncher.class.getDeclaredField("services");
+    servicesField.setAccessible(true);
+
+    List<Service> services = (List<Service>) servicesField.get(appLauncher);
+
+    Optional<Service> yarnServiceOptional = services.stream().filter(e -> e instanceof YarnService).findFirst();
+
+    Assert.assertTrue(yarnServiceOptional.isPresent());
+
+    YarnService yarnService = (YarnService) yarnServiceOptional.get();
+    Field configField = YarnService.class.getDeclaredField("config");
+    configField.setAccessible(true);
+    Config yarnServiceConfig = (Config) configField.get(yarnService);
+
+    Assert.assertEquals(yarnServiceConfig.getString("dynamicKey1"), "dynamicValue1");
+    Assert.assertEquals(yarnServiceConfig.getString(ConfigurationKeys.DYNAMIC_CONFIG_GENERATOR_CLASS_KEY),
+        TestDynamicConfigGenerator.class.getName());
+  }
+
+  /**
+   * An application master for accessing protected fields in {@link GobblinApplicationMaster}
+   * for testing.
+   */
+  private static class TestApplicationMaster extends GobblinApplicationMaster {
+    public TestApplicationMaster(String applicationName, ContainerId containerId, Config config,
+        YarnConfiguration yarnConfiguration)
+        throws Exception {
+      super(applicationName, containerId, config, yarnConfiguration);
+    }
+
+    public Config getConfig() {
+      return this.config;
+    }
+
+    public ServiceBasedAppLauncher getAppLauncher() {
+      return this.applicationLauncher;
+    }
+  }
+
+  /**
+   * Class for testing that dynamic config is injected
+   */
+  @VisibleForTesting
+  public static class TestDynamicConfigGenerator implements DynamicConfigGenerator {
+    public TestDynamicConfigGenerator() {
+    }
+
+    @Override
+    public Config generateDynamicConfig(Config config) {
+      return ConfigFactory.parseMap(ImmutableMap.of("dynamicKey1", "dynamicValue1"));
+    }
+  }
 }