You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/31 16:50:01 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1036] Add hadoop override configurations when instantiating FileSystem object in GobblinTaskRunner and GobblinClusterManager

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d761f66  [GOBBLIN-1036] Add hadoop override configurations when instantiating FileSystem object in GobblinTaskRunner and GobblinClusterManager
d761f66 is described below

commit d761f66c28523f17948790c2a7099181788e3408
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Jan 31 08:49:54 2020 -0800

    [GOBBLIN-1036] Add hadoop override configurations when instantiating FileSystem object in GobblinTaskRunner and GobblinClusterManager
    
    Closes #2878 from sv2000/parallelRunner
---
 .../gobblin/cluster/GobblinClusterConfigurationKeys.java    |  2 ++
 .../org/apache/gobblin/cluster/GobblinClusterManager.java   | 12 ++++++++++--
 .../java/org/apache/gobblin/cluster/GobblinTaskRunner.java  |  9 ++++++++-
 .../apache/gobblin/cluster/GobblinClusterManagerTest.java   | 12 ++++++++++++
 .../org/apache/gobblin/cluster/GobblinTaskRunnerTest.java   | 13 +++++++++++++
 5 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 28339f4..787130e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -181,4 +181,6 @@ public class GobblinClusterConfigurationKeys {
   // the cluster
   public static final String IS_HELIX_CLUSTER_MANAGED = GOBBLIN_CLUSTER_PREFIX + "isHelixClusterManaged";
   public static final boolean DEFAULT_IS_HELIX_CLUSTER_MANAGED = false;
+
+  public static final String HADOOP_CONFIG_OVERRIDES_PREFIX = GOBBLIN_CLUSTER_PREFIX + "hadoop.inject";
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 2826720..e1e8fd6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -73,6 +73,7 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -114,6 +115,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
 
   protected final Path appWorkDir;
 
+  @Getter
   protected final FileSystem fs;
 
   protected final String applicationId;
@@ -353,9 +355,15 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
    * Build the {@link FileSystem} for the Application Master.
    */
   private FileSystem buildFileSystem(Config config) throws IOException {
+    Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
+
+    Configuration conf = new Configuration();
+    //Add any Hadoop-specific overrides into the Configuration object
+    JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides), conf);
+
     return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
-        .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), new Configuration())
-        : FileSystem.get(new Configuration());
+        .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
+        : FileSystem.get(conf);
   }
 
   /**
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 5ada074..22c21bf 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -69,6 +69,8 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
@@ -77,6 +79,7 @@ import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.FileUtils;
 import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -108,7 +111,6 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
  */
 @Alpha
 public class GobblinTaskRunner implements StandardMetricsBridge {
-
   private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class);
   static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
 
@@ -140,6 +142,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
   protected final Config config;
 
+  @Getter
   protected final FileSystem fs;
   private final List<Service> services = Lists.newArrayList();
   protected final String applicationName;
@@ -420,6 +423,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
   private FileSystem buildFileSystem(Config config, Configuration conf)
       throws IOException {
+    Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
+
+    //Add any Hadoop-specific overrides into the Configuration object
+    JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides), conf);
     return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
         .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
         : FileSystem.get(conf);
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
index 9069c1e..840c8a6 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
@@ -21,6 +21,7 @@ import java.net.URL;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
@@ -61,6 +62,7 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 @Test(groups = { "gobblin.cluster" })
 public class GobblinClusterManagerTest implements HelixMessageTestBase {
   public final static Logger LOG = LoggerFactory.getLogger(GobblinClusterManagerTest.class);
+  public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop";
 
   private TestingServer testingZKServer;
 
@@ -83,6 +85,10 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
                    ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
         .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY,
             ConfigValueFactory.fromAnyRef("DEFAULT:1,OTHER:10"))
+        .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + HADOOP_OVERRIDE_PROPERTY_NAME,
+            ConfigValueFactory.fromAnyRef("value"))
+        .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + "fs.file.impl.disable.cache",
+            ConfigValueFactory.fromAnyRef("true"))
         .resolve();
 
     String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
@@ -176,6 +182,12 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
         }, "Cluster Manager shutdown");
   }
 
+  @Test
+  public void testBuildFileSystemConfig() {
+    FileSystem fileSystem = this.gobblinClusterManager.getFs();
+    Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
+  }
+
   @AfterClass
   public void tearDown() throws Exception {
     try {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 286c50a..b115607 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URL;
 
 import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +53,8 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 public class GobblinTaskRunnerTest {
   public final static Logger LOG = LoggerFactory.getLogger(GobblinTaskRunnerTest.class);
 
+  public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop";
+
   private TestingServer testingZKServer;
 
   private GobblinTaskRunner gobblinTaskRunner;
@@ -70,6 +73,10 @@ public class GobblinTaskRunnerTest {
     Config config = ConfigFactory.parseURL(url)
         .withValue("gobblin.cluster.zk.connection.string",
                    ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
+        .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + HADOOP_OVERRIDE_PROPERTY_NAME,
+            ConfigValueFactory.fromAnyRef("value"))
+        .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + "fs.file.impl.disable.cache",
+            ConfigValueFactory.fromAnyRef("true"))
         .resolve();
 
     String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
@@ -104,6 +111,12 @@ public class GobblinTaskRunnerTest {
       }, "gobblinTaskRunner stopped");
   }
 
+  @Test
+  public void testBuildFileSystemConfig() {
+    FileSystem fileSystem = this.gobblinTaskRunner.getFs();
+    Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
+  }
+
   @AfterClass
   public void tearDown() throws IOException {
     try {