You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/28 23:15:06 UTC

[1/2] beam git commit: [BEAM-2031] Add support to pass through zero or more Hadoop configurations through as PipelineOptions

Repository: beam
Updated Branches:
  refs/heads/master a3e7383cc -> 7478da997


[BEAM-2031] Add support to pass through zero or more Hadoop configurations through as PipelineOptions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/864e2ad2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/864e2ad2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/864e2ad2

Branch: refs/heads/master
Commit: 864e2ad26e3748b414d10c53c28e5e18fb4a53e7
Parents: a3e7383
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 28 10:30:24 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 28 16:14:34 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/hdfs/pom.xml                       | 10 +++
 .../sdk/io/hdfs/HadoopFileSystemModule.java     | 84 ++++++++++++++++++++
 .../sdk/io/hdfs/HadoopFileSystemOptions.java    | 49 ++++++++++++
 .../hdfs/HadoopFileSystemOptionsRegistrar.java  | 35 ++++++++
 .../sdk/io/hdfs/HadoopFileSystemModuleTest.java | 65 +++++++++++++++
 .../HadoopFileSystemOptionsRegistrarTest.java   | 49 ++++++++++++
 .../io/hdfs/HadoopFileSystemOptionsTest.java    | 48 +++++++++++
 7 files changed, 340 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 91c2cf7..46cf8cf 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -56,6 +56,16 @@
     </dependency>
 
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.auto.service</groupId>
       <artifactId>auto-service</artifactId>
       <optional>true</optional>

http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
new file mode 100644
index 0000000..2cb9d8a
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer}
+ * for a Hadoop {@link Configuration}. The serialized representation is that of a JSON map.
+ *
+ * <p>Note that the serialization of the Hadoop {@link Configuration} only keeps the keys and their
+ * values dropping any configuration hierarchy and source information.
+ */
+@AutoService(Module.class)
+public class HadoopFileSystemModule extends SimpleModule {
+  public HadoopFileSystemModule() {
+    super("HadoopFileSystemModule");
+    setMixInAnnotation(Configuration.class, ConfigurationMixin.class);
+  }
+
+  /** A mixin class to add Jackson annotations to the Hadoop {@link Configuration} class. */
+  @JsonDeserialize(using = ConfigurationDeserializer.class)
+  @JsonSerialize(using = ConfigurationSerializer.class)
+  private static class ConfigurationMixin {}
+
+  /** A Jackson {@link JsonDeserializer} for Hadoop {@link Configuration} objects. */
+  static class ConfigurationDeserializer extends JsonDeserializer<Configuration> {
+    @Override
+    public Configuration deserialize(JsonParser jsonParser,
+        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+      Map<String, String> rawConfiguration =
+          jsonParser.readValueAs(new TypeReference<Map<String, String>>() {});
+      Configuration configuration = new Configuration(false);
+      for (Map.Entry<String, String> entry : rawConfiguration.entrySet()) {
+        configuration.set(entry.getKey(), entry.getValue());
+      }
+      return configuration;
+    }
+  }
+
+  /** A Jackson {@link JsonSerializer} for Hadoop {@link Configuration} objects. */
+  static class ConfigurationSerializer extends JsonSerializer<Configuration> {
+    @Override
+    public void serialize(Configuration configuration, JsonGenerator jsonGenerator,
+        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+      Map<String, String> map = new TreeMap<>();
+      for (Map.Entry<String, String> entry : configuration) {
+        map.put(entry.getKey(), entry.getValue());
+      }
+      jsonGenerator.writeObject(map);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
new file mode 100644
index 0000000..31250bc
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.util.List;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
+ * for the {@link HadoopFileSystem}.
+ */
+public interface HadoopFileSystemOptions extends PipelineOptions {
+  @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
+      + "To specify on the command-line, represent the value as a JSON list of JSON maps, where "
+      + "each map represents the entire configuration for a single Hadoop filesystem. For example "
+      + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...},"
+      + "{\"fs.default.name\": \"s3a://\", ...},...]'")
+  @Default.InstanceFactory(ConfigurationLocator.class)
+  List<Configuration> getHdfsConfiguration();
+  void setHdfsConfiguration(List<Configuration> value);
+
+  /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */
+  class ConfigurationLocator implements DefaultValueFactory<Configuration> {
+    @Override
+    public Configuration create(PipelineOptions options) {
+      // TODO: Find default configuration to use
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
new file mode 100644
index 0000000..344623b
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * {@link AutoService} registrar for {@link HadoopFileSystemOptions}.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class HadoopFileSystemOptionsRegistrar implements PipelineOptionsRegistrar {
+
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.<Class<? extends PipelineOptions>>of(HadoopFileSystemOptions.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
new file mode 100644
index 0000000..6963116
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.hadoop.conf.Configuration;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test for {@link HadoopFileSystemModule}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemModuleTest {
+  @Test
+  public void testObjectMapperIsAbleToFindModule() throws Exception {
+    List<Module> modules = ObjectMapper.findModules(ReflectHelpers.findClassLoader());
+    assertThat(modules, hasItem(Matchers.<Module>instanceOf(HadoopFileSystemModule.class)));
+  }
+
+  @Test
+  public void testConfigurationSerializationDeserialization() throws Exception {
+    Configuration baseConfiguration = new Configuration(false);
+    baseConfiguration.set("testPropertyA", "baseA");
+    baseConfiguration.set("testPropertyC", "baseC");
+    Configuration configuration = new Configuration(false);
+    configuration.addResource(baseConfiguration);
+    configuration.set("testPropertyA", "A");
+    configuration.set("testPropertyB", "B");
+    ObjectMapper objectMapper = new ObjectMapper();
+    objectMapper.registerModule(new HadoopFileSystemModule());
+    String serializedConfiguration = objectMapper.writeValueAsString(configuration);
+    Configuration deserializedConfiguration =
+        objectMapper.readValue(serializedConfiguration, Configuration.class);
+    assertThat(deserializedConfiguration, Matchers.<Map.Entry<String, String>>contains(
+        new AbstractMap.SimpleEntry("testPropertyA", "A"),
+        new AbstractMap.SimpleEntry("testPropertyB", "B"),
+        new AbstractMap.SimpleEntry("testPropertyC", "baseC")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
new file mode 100644
index 0000000..2be3d93
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemOptionsRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemOptionsRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (PipelineOptionsRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
+      if (registrar instanceof HadoopFileSystemOptionsRegistrar) {
+        assertThat(registrar.getPipelineOptions(),
+            Matchers.<Class<?>>contains(HadoopFileSystemOptions.class));
+        return;
+      }
+    }
+    fail("Expected to find " + HadoopFileSystemOptionsRegistrar.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
new file mode 100644
index 0000000..634528b
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemOptions}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemOptionsTest {
+  @Test
+  public void testParsingHdfsConfiguration() {
+    HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(
+        "--hdfsConfiguration=["
+            + "{\"propertyA\": \"A\"},"
+            + "{\"propertyB\": \"B\"}]").as(HadoopFileSystemOptions.class);
+    assertEquals(2, options.getHdfsConfiguration().size());
+    assertThat(options.getHdfsConfiguration().get(0), Matchers.<Map.Entry<String, String>>contains(
+        new AbstractMap.SimpleEntry("propertyA", "A")));
+    assertThat(options.getHdfsConfiguration().get(1), Matchers.<Map.Entry<String, String>>contains(
+        new AbstractMap.SimpleEntry("propertyB", "B")));
+  }
+}


[2/2] beam git commit: [BEAM-2031] Add support to pass through zero or more Hadoop configurations through as PipelineOptions

Posted by lc...@apache.org.
[BEAM-2031] Add support to pass through zero or more Hadoop configurations through as PipelineOptions

This closes #2762


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7478da99
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7478da99
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7478da99

Branch: refs/heads/master
Commit: 7478da9975e0c73af562946efccd10ec64b2ef0e
Parents: a3e7383 864e2ad
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 28 16:14:58 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 28 16:14:58 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/hdfs/pom.xml                       | 10 +++
 .../sdk/io/hdfs/HadoopFileSystemModule.java     | 84 ++++++++++++++++++++
 .../sdk/io/hdfs/HadoopFileSystemOptions.java    | 49 ++++++++++++
 .../hdfs/HadoopFileSystemOptionsRegistrar.java  | 35 ++++++++
 .../sdk/io/hdfs/HadoopFileSystemModuleTest.java | 65 +++++++++++++++
 .../HadoopFileSystemOptionsRegistrarTest.java   | 49 ++++++++++++
 .../io/hdfs/HadoopFileSystemOptionsTest.java    | 48 +++++++++++
 7 files changed, 340 insertions(+)
----------------------------------------------------------------------