You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/28 23:15:06 UTC
[1/2] beam git commit: [BEAM-2031] Add support to pass through zero
or more Hadoop configurations through as PipelineOptions
Repository: beam
Updated Branches:
refs/heads/master a3e7383cc -> 7478da997
[BEAM-2031] Add support to pass through zero or more Hadoop configurations through as PipelineOptions
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/864e2ad2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/864e2ad2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/864e2ad2
Branch: refs/heads/master
Commit: 864e2ad26e3748b414d10c53c28e5e18fb4a53e7
Parents: a3e7383
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 28 10:30:24 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 28 16:14:34 2017 -0700
----------------------------------------------------------------------
sdks/java/io/hdfs/pom.xml | 10 +++
.../sdk/io/hdfs/HadoopFileSystemModule.java | 84 ++++++++++++++++++++
.../sdk/io/hdfs/HadoopFileSystemOptions.java | 49 ++++++++++++
.../hdfs/HadoopFileSystemOptionsRegistrar.java | 35 ++++++++
.../sdk/io/hdfs/HadoopFileSystemModuleTest.java | 65 +++++++++++++++
.../HadoopFileSystemOptionsRegistrarTest.java | 49 ++++++++++++
.../io/hdfs/HadoopFileSystemOptionsTest.java | 48 +++++++++++
7 files changed, 340 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 91c2cf7..46cf8cf 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -56,6 +56,16 @@
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<optional>true</optional>
http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
new file mode 100644
index 0000000..2cb9d8a
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer}
+ * for a Hadoop {@link Configuration}. The serialized representation is that of a JSON map.
+ *
+ * <p>Note that the serialization of the Hadoop {@link Configuration} only keeps the keys and their
+ * values dropping any configuration hierarchy and source information.
+ */
+@AutoService(Module.class)
+public class HadoopFileSystemModule extends SimpleModule {
+ public HadoopFileSystemModule() {
+ super("HadoopFileSystemModule");
+ setMixInAnnotation(Configuration.class, ConfigurationMixin.class);
+ }
+
+ /** A mixin class to add Jackson annotations to the Hadoop {@link Configuration} class. */
+ @JsonDeserialize(using = ConfigurationDeserializer.class)
+ @JsonSerialize(using = ConfigurationSerializer.class)
+ private static class ConfigurationMixin {}
+
+ /** A Jackson {@link JsonDeserializer} for Hadoop {@link Configuration} objects. */
+ static class ConfigurationDeserializer extends JsonDeserializer<Configuration> {
+ @Override
+ public Configuration deserialize(JsonParser jsonParser,
+ DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+ Map<String, String> rawConfiguration =
+ jsonParser.readValueAs(new TypeReference<Map<String, String>>() {});
+ Configuration configuration = new Configuration(false);
+ for (Map.Entry<String, String> entry : rawConfiguration.entrySet()) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+ return configuration;
+ }
+ }
+
+ /** A Jackson {@link JsonSerializer} for Hadoop {@link Configuration} objects. */
+ static class ConfigurationSerializer extends JsonSerializer<Configuration> {
+ @Override
+ public void serialize(Configuration configuration, JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+ Map<String, String> map = new TreeMap<>();
+ for (Map.Entry<String, String> entry : configuration) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ jsonGenerator.writeObject(map);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
new file mode 100644
index 0000000..31250bc
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.util.List;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
+ * for the {@link HadoopFileSystem}.
+ */
+public interface HadoopFileSystemOptions extends PipelineOptions {
+ @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
+ + "To specify on the command-line, represent the value as a JSON list of JSON maps, where "
+ + "each map represents the entire configuration for a single Hadoop filesystem. For example "
+ + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...},"
+ + "{\"fs.default.name\": \"s3a://\", ...},...]'")
+ @Default.InstanceFactory(ConfigurationLocator.class)
+ List<Configuration> getHdfsConfiguration();
+ void setHdfsConfiguration(List<Configuration> value);
+
+ /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */
+ class ConfigurationLocator implements DefaultValueFactory<Configuration> {
+ @Override
+ public Configuration create(PipelineOptions options) {
+ // TODO: Find default configuration to use
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
new file mode 100644
index 0000000..344623b
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * {@link AutoService} registrar for {@link HadoopFileSystemOptions}.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class HadoopFileSystemOptionsRegistrar implements PipelineOptionsRegistrar {
+
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(HadoopFileSystemOptions.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
new file mode 100644
index 0000000..6963116
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.hadoop.conf.Configuration;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test for {@link HadoopFileSystemModule}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemModuleTest {
+ @Test
+ public void testObjectMapperIsAbleToFindModule() throws Exception {
+ List<Module> modules = ObjectMapper.findModules(ReflectHelpers.findClassLoader());
+ assertThat(modules, hasItem(Matchers.<Module>instanceOf(HadoopFileSystemModule.class)));
+ }
+
+ @Test
+ public void testConfigurationSerializationDeserialization() throws Exception {
+ Configuration baseConfiguration = new Configuration(false);
+ baseConfiguration.set("testPropertyA", "baseA");
+ baseConfiguration.set("testPropertyC", "baseC");
+ Configuration configuration = new Configuration(false);
+ configuration.addResource(baseConfiguration);
+ configuration.set("testPropertyA", "A");
+ configuration.set("testPropertyB", "B");
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new HadoopFileSystemModule());
+ String serializedConfiguration = objectMapper.writeValueAsString(configuration);
+ Configuration deserializedConfiguration =
+ objectMapper.readValue(serializedConfiguration, Configuration.class);
+ assertThat(deserializedConfiguration, Matchers.<Map.Entry<String, String>>contains(
+ new AbstractMap.SimpleEntry("testPropertyA", "A"),
+ new AbstractMap.SimpleEntry("testPropertyB", "B"),
+ new AbstractMap.SimpleEntry("testPropertyC", "baseC")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
new file mode 100644
index 0000000..2be3d93
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemOptionsRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemOptionsRegistrarTest {
+
+ @Test
+ public void testServiceLoader() {
+ for (PipelineOptionsRegistrar registrar
+ : Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
+ if (registrar instanceof HadoopFileSystemOptionsRegistrar) {
+ assertThat(registrar.getPipelineOptions(),
+ Matchers.<Class<?>>contains(HadoopFileSystemOptions.class));
+ return;
+ }
+ }
+ fail("Expected to find " + HadoopFileSystemOptionsRegistrar.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
new file mode 100644
index 0000000..634528b
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemOptions}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemOptionsTest {
+ @Test
+ public void testParsingHdfsConfiguration() {
+ HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(
+ "--hdfsConfiguration=["
+ + "{\"propertyA\": \"A\"},"
+ + "{\"propertyB\": \"B\"}]").as(HadoopFileSystemOptions.class);
+ assertEquals(2, options.getHdfsConfiguration().size());
+ assertThat(options.getHdfsConfiguration().get(0), Matchers.<Map.Entry<String, String>>contains(
+ new AbstractMap.SimpleEntry("propertyA", "A")));
+ assertThat(options.getHdfsConfiguration().get(1), Matchers.<Map.Entry<String, String>>contains(
+ new AbstractMap.SimpleEntry("propertyB", "B")));
+ }
+}
[2/2] beam git commit: [BEAM-2031] Add support to pass through zero
or more Hadoop configurations through as PipelineOptions
Posted by lc...@apache.org.
[BEAM-2031] Add support to pass through zero or more Hadoop configurations through as PipelineOptions
This closes #2762
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7478da99
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7478da99
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7478da99
Branch: refs/heads/master
Commit: 7478da9975e0c73af562946efccd10ec64b2ef0e
Parents: a3e7383 864e2ad
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 28 16:14:58 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 28 16:14:58 2017 -0700
----------------------------------------------------------------------
sdks/java/io/hdfs/pom.xml | 10 +++
.../sdk/io/hdfs/HadoopFileSystemModule.java | 84 ++++++++++++++++++++
.../sdk/io/hdfs/HadoopFileSystemOptions.java | 49 ++++++++++++
.../hdfs/HadoopFileSystemOptionsRegistrar.java | 35 ++++++++
.../sdk/io/hdfs/HadoopFileSystemModuleTest.java | 65 +++++++++++++++
.../HadoopFileSystemOptionsRegistrarTest.java | 49 ++++++++++++
.../io/hdfs/HadoopFileSystemOptionsTest.java | 48 +++++++++++
7 files changed, 340 insertions(+)
----------------------------------------------------------------------