You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/04 15:48:38 UTC
[1/2] beam git commit: [BEAM-1491]Identify HADOOP_CONF_DIR(or
YARN_CONF_DIR) environment variables
Repository: beam
Updated Branches:
refs/heads/master e1d4aa963 -> 588f57a1e
[BEAM-1491]Identify HADOOP_CONF_DIR(or YARN_CONF_DIR) environment variables
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16717083
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16717083
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16717083
Branch: refs/heads/master
Commit: 1671708340fb9fc57cdc91c3bbacdff3ae6af4af
Parents: e1d4aa9
Author: yangping.wu <ya...@qunar.com>
Authored: Thu May 4 14:04:08 2017 +0800
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 08:45:04 2017 -0700
----------------------------------------------------------------------
sdks/java/io/hadoop-file-system/pom.xml | 6 +
.../sdk/io/hdfs/HadoopFileSystemOptions.java | 73 +++++++++--
.../io/hdfs/HadoopFileSystemOptionsTest.java | 125 +++++++++++++++++++
3 files changed, 197 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/16717083/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index 3ec9848..562277e 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -187,6 +187,12 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/16717083/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
index 31250bc..45f43e2 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
@@ -17,12 +17,22 @@
*/
package org.apache.beam.sdk.io.hdfs;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
@@ -30,20 +40,69 @@ import org.apache.hadoop.conf.Configuration;
*/
public interface HadoopFileSystemOptions extends PipelineOptions {
@Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
- + "To specify on the command-line, represent the value as a JSON list of JSON maps, where "
- + "each map represents the entire configuration for a single Hadoop filesystem. For example "
- + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...},"
+ + "By default, Hadoop configuration is loaded from 'core-site.xml' and 'hdfs-site.xml' "
+ + "based upon the HADOOP_CONF_DIR and YARN_CONF_DIR environment variables. "
+ + "To specify configuration on the command-line, represent the value as a JSON list of JSON "
+ + "maps, where each map represents the entire configuration for a single Hadoop filesystem. "
+ + "For example --hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...},"
+ "{\"fs.default.name\": \"s3a://\", ...},...]'")
@Default.InstanceFactory(ConfigurationLocator.class)
List<Configuration> getHdfsConfiguration();
void setHdfsConfiguration(List<Configuration> value);
/** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */
- class ConfigurationLocator implements DefaultValueFactory<Configuration> {
+ class ConfigurationLocator implements DefaultValueFactory<List<Configuration>> {
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigurationLocator.class);
@Override
- public Configuration create(PipelineOptions options) {
- // TODO: Find default configuration to use
- return null;
+ public List<Configuration> create(PipelineOptions options) {
+ // Find default configuration when HADOOP_CONF_DIR or YARN_CONF_DIR is set.
+ List<Configuration> configurationList = readConfigurationFromHadoopYarnConfigDirs();
+ return configurationList.size() > 0 ? configurationList : null;
+ }
+
+ private List<Configuration> readConfigurationFromHadoopYarnConfigDirs() {
+ List<Configuration> configurationList = Lists.newArrayList();
+
+ /*
+ * If we find a configuration in HADOOP_CONF_DIR and YARN_CONF_DIR,
+ * we should be returning them both separately.
+ *
+ * Also, ensure that we only load one configuration if both
+ * HADOOP_CONF_DIR and YARN_CONF_DIR point to the same location.
+ */
+ Set<String> confDirs = Sets.newHashSet();
+ for (String confDir : Lists.newArrayList("HADOOP_CONF_DIR", "YARN_CONF_DIR")) {
+ if (getEnvironment().containsKey(confDir)) {
+ String hadoopConfDir = getEnvironment().get(confDir);
+ if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+ confDirs.add(hadoopConfDir);
+ }
+ }
+ }
+
+ for (String confDir : confDirs) {
+ if (new File(confDir).exists()) {
+ Configuration conf = new Configuration(false);
+ boolean confLoaded = false;
+ for (String confName : Lists.newArrayList("core-site.xml", "hdfs-site.xml")) {
+ File confFile = new File(confDir, confName);
+ if (confFile.exists()) {
+ LOG.debug("Adding {} to hadoop configuration", confFile.getAbsolutePath());
+ conf.addResource(new Path(confFile.getAbsolutePath()));
+ confLoaded = true;
+ }
+ }
+ if (confLoaded) {
+ configurationList.add(conf);
+ }
+ }
+ }
+ return configurationList;
+ }
+
+ @VisibleForTesting
+ Map<String, String> getEnvironment() {
+ return System.getenv();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/16717083/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
index 634528b..c99fc5b 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
@@ -18,13 +18,26 @@
package org.apache.beam.sdk.io.hdfs;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
+import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.hadoop.conf.Configuration;
import org.hamcrest.Matchers;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -33,6 +46,8 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class HadoopFileSystemOptionsTest {
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
@Test
public void testParsingHdfsConfiguration() {
HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(
@@ -45,4 +60,114 @@ public class HadoopFileSystemOptionsTest {
assertThat(options.getHdfsConfiguration().get(1), Matchers.<Map.Entry<String, String>>contains(
new AbstractMap.SimpleEntry("propertyB", "B")));
}
+
+ @Test
+ public void testDefaultUnsetEnvHdfsConfiguration() {
+ HadoopFileSystemOptions.ConfigurationLocator projectFactory =
+ spy(new HadoopFileSystemOptions.ConfigurationLocator());
+ when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of());
+ assertNull(projectFactory.create(PipelineOptionsFactory.create()));
+ }
+
+ @Test
+ public void testDefaultJustSetHadoopConfDirConfiguration() throws IOException {
+ Files.write(createPropertyData("A"),
+ tmpFolder.newFile("core-site.xml"), StandardCharsets.UTF_8);
+ Files.write(createPropertyData("B"),
+ tmpFolder.newFile("hdfs-site.xml"), StandardCharsets.UTF_8);
+ HadoopFileSystemOptions.ConfigurationLocator configurationLocator =
+ spy(new HadoopFileSystemOptions.ConfigurationLocator());
+ Map<String, String> environment = Maps.newHashMap();
+ environment.put("HADOOP_CONF_DIR", tmpFolder.getRoot().getAbsolutePath());
+ when(configurationLocator.getEnvironment()).thenReturn(environment);
+
+ List<Configuration> configurationList =
+ configurationLocator.create(PipelineOptionsFactory.create());
+ assertEquals(1, configurationList.size());
+ assertThat(configurationList.get(0).get("propertyA"), Matchers.equalTo("A"));
+ assertThat(configurationList.get(0).get("propertyB"), Matchers.equalTo("B"));
+ }
+
+ @Test
+ public void testDefaultJustSetYarnConfDirConfiguration() throws IOException {
+ Files.write(createPropertyData("A"),
+ tmpFolder.newFile("core-site.xml"), StandardCharsets.UTF_8);
+ Files.write(createPropertyData("B"),
+ tmpFolder.newFile("hdfs-site.xml"), StandardCharsets.UTF_8);
+ HadoopFileSystemOptions.ConfigurationLocator configurationLocator =
+ spy(new HadoopFileSystemOptions.ConfigurationLocator());
+ Map<String, String> environment = Maps.newHashMap();
+ environment.put("YARN_CONF_DIR", tmpFolder.getRoot().getAbsolutePath());
+ when(configurationLocator.getEnvironment()).thenReturn(environment);
+
+ List<Configuration> configurationList =
+ configurationLocator.create(PipelineOptionsFactory.create());
+ assertEquals(1, configurationList.size());
+ assertThat(configurationList.get(0).get("propertyA"), Matchers.equalTo("A"));
+ assertThat(configurationList.get(0).get("propertyB"), Matchers.equalTo("B"));
+ }
+
+ @Test
+ public void testDefaultSetYarnConfDirAndHadoopConfDirAndSameConfiguration() throws IOException {
+ Files.write(createPropertyData("A"),
+ tmpFolder.newFile("core-site.xml"), StandardCharsets.UTF_8);
+ Files.write(createPropertyData("B"),
+ tmpFolder.newFile("hdfs-site.xml"), StandardCharsets.UTF_8);
+ HadoopFileSystemOptions.ConfigurationLocator configurationLocator =
+ spy(new HadoopFileSystemOptions.ConfigurationLocator());
+ Map<String, String> environment = Maps.newHashMap();
+ environment.put("YARN_CONF_DIR", tmpFolder.getRoot().getAbsolutePath());
+ environment.put("HADOOP_CONF_DIR", tmpFolder.getRoot().getAbsolutePath());
+ when(configurationLocator.getEnvironment()).thenReturn(environment);
+
+ List<Configuration> configurationList =
+ configurationLocator.create(PipelineOptionsFactory.create());
+ assertEquals(1, configurationList.size());
+ assertThat(configurationList.get(0).get("propertyA"), Matchers.equalTo("A"));
+ assertThat(configurationList.get(0).get("propertyB"), Matchers.equalTo("B"));
+ }
+
+ @Test
+ public void testDefaultSetYarnConfDirAndHadoopConfDirNotSameConfiguration() throws IOException {
+ File hadoopConfDir = tmpFolder.newFolder("hadoop");
+ File yarnConfDir = tmpFolder.newFolder("yarn");
+ Files.write(createPropertyData("A"),
+ new File(hadoopConfDir, "core-site.xml"), StandardCharsets.UTF_8);
+ Files.write(createPropertyData("B"),
+ new File(hadoopConfDir, "hdfs-site.xml"), StandardCharsets.UTF_8);
+ Files.write(createPropertyData("C"),
+ new File(yarnConfDir, "core-site.xml"), StandardCharsets.UTF_8);
+ Files.write(createPropertyData("D"),
+ new File(yarnConfDir, "hdfs-site.xml"), StandardCharsets.UTF_8);
+ HadoopFileSystemOptions.ConfigurationLocator configurationLocator =
+ spy(new HadoopFileSystemOptions.ConfigurationLocator());
+ Map<String, String> environment = Maps.newHashMap();
+ environment.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath());
+ environment.put("HADOOP_CONF_DIR", yarnConfDir.getAbsolutePath());
+ when(configurationLocator.getEnvironment()).thenReturn(environment);
+
+ List<Configuration> configurationList =
+ configurationLocator.create(PipelineOptionsFactory.create());
+ assertEquals(2, configurationList.size());
+ int hadoopConfIndex = configurationList.get(0).get("propertyA") != null ? 0 : 1;
+ assertThat(
+ configurationList.get(hadoopConfIndex).get("propertyA"), Matchers.equalTo("A"));
+ assertThat(
+ configurationList.get(hadoopConfIndex).get("propertyB"), Matchers.equalTo("B"));
+ assertThat(
+ configurationList.get(1 - hadoopConfIndex).get("propertyC"), Matchers.equalTo("C"));
+ assertThat(
+ configurationList.get(1 - hadoopConfIndex).get("propertyD"), Matchers.equalTo("D"));
+ }
+
+ private static String createPropertyData(String property) {
+ return "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ + "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"
+ + "<configuration>\n"
+ + " <property>\n"
+ + " <name>property" + property + "</name>\n"
+ + " <value>" + property + "</value>\n"
+ + " </property>\n"
+ + "</configuration>";
+ }
}
[2/2] beam git commit: [BEAM-1491]Identify HADOOP_CONF_DIR(or
YARN_CONF_DIR) environment variables
Posted by lc...@apache.org.
[BEAM-1491]Identify HADOOP_CONF_DIR(or YARN_CONF_DIR) environment variables
This closes #2890
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/588f57a1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/588f57a1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/588f57a1
Branch: refs/heads/master
Commit: 588f57a1e6771883df84d06087a93fa4fc4baa54
Parents: e1d4aa9 1671708
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 08:48:23 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 08:48:23 2017 -0700
----------------------------------------------------------------------
sdks/java/io/hadoop-file-system/pom.xml | 6 +
.../sdk/io/hdfs/HadoopFileSystemOptions.java | 73 +++++++++--
.../io/hdfs/HadoopFileSystemOptionsTest.java | 125 +++++++++++++++++++
3 files changed, 197 insertions(+), 7 deletions(-)
----------------------------------------------------------------------