You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/31 16:50:01 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1036] Add
hadoop override configurations when instantiating FileSystem object in
GobblinTaskRunner and GobblinClusterManager
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d761f66 [GOBBLIN-1036] Add hadoop override configurations when instantiating FileSystem object in GobblinTaskRunner and GobblinClusterManager
d761f66 is described below
commit d761f66c28523f17948790c2a7099181788e3408
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Jan 31 08:49:54 2020 -0800
[GOBBLIN-1036] Add hadoop override configurations when instantiating FileSystem object in GobblinTaskRunner and GobblinClusterManager
Closes #2878 from sv2000/parallelRunner
---
.../gobblin/cluster/GobblinClusterConfigurationKeys.java | 2 ++
.../org/apache/gobblin/cluster/GobblinClusterManager.java | 12 ++++++++++--
.../java/org/apache/gobblin/cluster/GobblinTaskRunner.java | 9 ++++++++-
.../apache/gobblin/cluster/GobblinClusterManagerTest.java | 12 ++++++++++++
.../org/apache/gobblin/cluster/GobblinTaskRunnerTest.java | 13 +++++++++++++
5 files changed, 45 insertions(+), 3 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 28339f4..787130e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -181,4 +181,6 @@ public class GobblinClusterConfigurationKeys {
// the cluster
public static final String IS_HELIX_CLUSTER_MANAGED = GOBBLIN_CLUSTER_PREFIX + "isHelixClusterManaged";
public static final boolean DEFAULT_IS_HELIX_CLUSTER_MANAGED = false;
+
+ public static final String HADOOP_CONFIG_OVERRIDES_PREFIX = GOBBLIN_CLUSTER_PREFIX + "hadoop.inject";
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 2826720..e1e8fd6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -73,6 +73,7 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -114,6 +115,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
protected final Path appWorkDir;
+ @Getter
protected final FileSystem fs;
protected final String applicationId;
@@ -353,9 +355,15 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
* Build the {@link FileSystem} for the Application Master.
*/
private FileSystem buildFileSystem(Config config) throws IOException {
+ Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
+
+ Configuration conf = new Configuration();
+ //Add any Hadoop-specific overrides into the Configuration object
+ JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides), conf);
+
return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
- .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), new Configuration())
- : FileSystem.get(new Configuration());
+ .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
+ : FileSystem.get(conf);
}
/**
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 5ada074..22c21bf 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -69,6 +69,8 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
@@ -77,6 +79,7 @@ import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.FileUtils;
import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -108,7 +111,6 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
*/
@Alpha
public class GobblinTaskRunner implements StandardMetricsBridge {
-
private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class);
static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
@@ -140,6 +142,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
protected final Config config;
+ @Getter
protected final FileSystem fs;
private final List<Service> services = Lists.newArrayList();
protected final String applicationName;
@@ -420,6 +423,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
private FileSystem buildFileSystem(Config config, Configuration conf)
throws IOException {
+ Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
+
+ //Add any Hadoop-specific overrides into the Configuration object
+ JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides), conf);
return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
: FileSystem.get(conf);
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
index 9069c1e..840c8a6 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
@@ -21,6 +21,7 @@ import java.net.URL;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
@@ -61,6 +62,7 @@ import org.apache.gobblin.testing.AssertWithBackoff;
@Test(groups = { "gobblin.cluster" })
public class GobblinClusterManagerTest implements HelixMessageTestBase {
public final static Logger LOG = LoggerFactory.getLogger(GobblinClusterManagerTest.class);
+ public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop";
private TestingServer testingZKServer;
@@ -83,6 +85,10 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
.withValue(GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY,
ConfigValueFactory.fromAnyRef("DEFAULT:1,OTHER:10"))
+ .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + HADOOP_OVERRIDE_PROPERTY_NAME,
+ ConfigValueFactory.fromAnyRef("value"))
+ .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + "fs.file.impl.disable.cache",
+ ConfigValueFactory.fromAnyRef("true"))
.resolve();
String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
@@ -176,6 +182,12 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
}, "Cluster Manager shutdown");
}
+ @Test
+ public void testBuildFileSystemConfig() {
+ FileSystem fileSystem = this.gobblinClusterManager.getFs();
+ Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
+ }
+
@AfterClass
public void tearDown() throws Exception {
try {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 286c50a..b115607 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.URL;
import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +53,8 @@ import org.apache.gobblin.testing.AssertWithBackoff;
public class GobblinTaskRunnerTest {
public final static Logger LOG = LoggerFactory.getLogger(GobblinTaskRunnerTest.class);
+ public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop";
+
private TestingServer testingZKServer;
private GobblinTaskRunner gobblinTaskRunner;
@@ -70,6 +73,10 @@ public class GobblinTaskRunnerTest {
Config config = ConfigFactory.parseURL(url)
.withValue("gobblin.cluster.zk.connection.string",
ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
+ .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + HADOOP_OVERRIDE_PROPERTY_NAME,
+ ConfigValueFactory.fromAnyRef("value"))
+ .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + "fs.file.impl.disable.cache",
+ ConfigValueFactory.fromAnyRef("true"))
.resolve();
String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
@@ -104,6 +111,12 @@ public class GobblinTaskRunnerTest {
}, "gobblinTaskRunner stopped");
}
+ @Test
+ public void testBuildFileSystemConfig() {
+ FileSystem fileSystem = this.gobblinTaskRunner.getFs();
+ Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
+ }
+
@AfterClass
public void tearDown() throws IOException {
try {