You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/02 19:58:36 UTC
[4/5] beam git commit: [BEAM-2135] Move hdfs to hadoop-file-system
[BEAM-2135] Move hdfs to hadoop-file-system
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bacd33c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bacd33c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bacd33c8
Branch: refs/heads/master
Commit: bacd33c81d99f4a3d1b11eb391a7f790087fc2a1
Parents: 3161904
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 2 09:20:49 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 2 12:57:44 2017 -0700
----------------------------------------------------------------------
sdks/java/io/hadoop-file-system/README.md | 43 ++
sdks/java/io/hadoop-file-system/pom.xml | 195 ++++++
.../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 478 ++++++++++++++
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 +++++++++++++++++++
.../beam/sdk/io/hdfs/HadoopFileSystem.java | 240 +++++++
.../sdk/io/hdfs/HadoopFileSystemModule.java | 84 +++
.../sdk/io/hdfs/HadoopFileSystemOptions.java | 49 ++
.../hdfs/HadoopFileSystemOptionsRegistrar.java | 35 ++
.../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 62 ++
.../beam/sdk/io/hdfs/HadoopResourceId.java | 81 +++
.../java/org/apache/beam/sdk/io/hdfs/Sink.java | 195 ++++++
.../org/apache/beam/sdk/io/hdfs/UGIHelper.java | 38 ++
.../java/org/apache/beam/sdk/io/hdfs/Write.java | 585 +++++++++++++++++
.../apache/beam/sdk/io/hdfs/package-info.java | 22 +
.../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 172 +++++
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 231 +++++++
.../sdk/io/hdfs/HadoopFileSystemModuleTest.java | 65 ++
.../HadoopFileSystemOptionsRegistrarTest.java | 49 ++
.../io/hdfs/HadoopFileSystemOptionsTest.java | 48 ++
.../io/hdfs/HadoopFileSystemRegistrarTest.java | 81 +++
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 247 ++++++++
sdks/java/io/hdfs/README.md | 43 --
sdks/java/io/hdfs/pom.xml | 195 ------
.../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 478 --------------
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 -------------------
.../beam/sdk/io/hdfs/HadoopFileSystem.java | 240 -------
.../sdk/io/hdfs/HadoopFileSystemModule.java | 84 ---
.../sdk/io/hdfs/HadoopFileSystemOptions.java | 49 --
.../hdfs/HadoopFileSystemOptionsRegistrar.java | 35 --
.../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 62 --
.../beam/sdk/io/hdfs/HadoopResourceId.java | 81 ---
.../java/org/apache/beam/sdk/io/hdfs/Sink.java | 195 ------
.../org/apache/beam/sdk/io/hdfs/UGIHelper.java | 38 --
.../java/org/apache/beam/sdk/io/hdfs/Write.java | 585 -----------------
.../apache/beam/sdk/io/hdfs/package-info.java | 22 -
.../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 172 -----
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 231 -------
.../sdk/io/hdfs/HadoopFileSystemModuleTest.java | 65 --
.../HadoopFileSystemOptionsRegistrarTest.java | 49 --
.../io/hdfs/HadoopFileSystemOptionsTest.java | 48 --
.../io/hdfs/HadoopFileSystemRegistrarTest.java | 81 ---
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 247 --------
sdks/java/io/pom.xml | 2 +-
43 files changed, 3626 insertions(+), 3626 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/README.md b/sdks/java/io/hadoop-file-system/README.md
new file mode 100644
index 0000000..3a734f2
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/README.md
@@ -0,0 +1,43 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# HDFS IO
+
+This library provides HDFS sources and sinks to make it possible to read and
+write Apache Hadoop file formats from Apache Beam pipelines.
+
+Currently, only the read path is implemented. A `HDFSFileSource` allows any
+Hadoop `FileInputFormat` to be read as a `PCollection`.
+
+A `HDFSFileSource` can be read from using the
+`org.apache.beam.sdk.io.Read` transform. For example:
+
+```java
+HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
+ MyKey.class, MyValue.class);
+PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
+```
+
+Alternatively, the `readFrom` method is a convenience method that returns a read
+transform. For example:
+
+```java
+PCollection<KV<MyKey, MyValue>> records = pipeline.apply(HDFSFileSource.readFrom(path,
+ MyInputFormat.class, MyKey.class, MyValue.class));
+```
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
new file mode 100644
index 0000000..3ec9848
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop File System</name>
+ <description>Library to read and write Hadoop/HDFS file formats from Beam.</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <beamUseDummyRunner>false</beamUseDummyRunner>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <properties>
+ <!--
+ This is the version of Hadoop used to compile the hadoop-common module.
+ This dependency is defined with a provided scope.
+ Users must supply their own Hadoop version at runtime.
+ -->
+ <hadoop.version>2.7.3</hadoop.version>
+ </properties>
+
+ <dependencyManagement>
+ <!--
+ We define dependencies here instead of sdks/java/io because
+ of a version mimatch between this Hadoop version and other
+ Hadoop versions declared in other io submodules.
+ -->
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <classifier>tests</classifier>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>${avro.version}</version>
+ <classifier>hadoop2</classifier>
+ <exclusions>
+ <!-- exclude old Jetty version of servlet API -->
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
new file mode 100644
index 0000000..aee73c4
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.Random;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based
+ * output
+ * format.
+ *
+ * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop
+ * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported
+ * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the
+ * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K
+ * and V.
+ *
+ * <p>{@code HDFSFileSink} can be used by {@link Write} to create write
+ * transform. See example below.
+ *
+ * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example:
+ *
+ * <pre>
+ * {@code
+ * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, NullWritable> sink =
+ * HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class));
+ * avroRecordsPCollection.apply(Write.to(sink));
+ * }
+ * </pre>
+ *
+ * @param <T> the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}.
+ * @param <K> the type of keys to be written to the sink via {@link FileOutputFormat}.
+ * @param <V> the type of values to be written to the sink via {@link FileOutputFormat}.
+ */
+@AutoValue
+@Experimental
+public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
+
+ private static final JobID jobId = new JobID(
+ Long.toString(System.currentTimeMillis()),
+ new Random().nextInt(Integer.MAX_VALUE));
+
+ public abstract String path();
+ public abstract Class<? extends FileOutputFormat<K, V>> formatClass();
+ public abstract Class<K> keyClass();
+ public abstract Class<V> valueClass();
+ public abstract SerializableFunction<T, KV<K, V>> outputConverter();
+ public abstract SerializableConfiguration serializableConfiguration();
+ public @Nullable abstract String username();
+ public abstract boolean validate();
+
+ // =======================================================================
+ // Factory methods
+ // =======================================================================
+
+ public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, V>
+ to(String path,
+ Class<W> formatClass,
+ Class<K> keyClass,
+ Class<V> valueClass,
+ SerializableFunction<T, KV<K, V>> outputConverter) {
+ return HDFSFileSink.<T, K, V>builder()
+ .setPath(path)
+ .setFormatClass(formatClass)
+ .setKeyClass(keyClass)
+ .setValueClass(valueClass)
+ .setOutputConverter(outputConverter)
+ .setConfiguration(null)
+ .setUsername(null)
+ .setValidate(true)
+ .build();
+ }
+
+ public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) {
+ SerializableFunction<T, KV<NullWritable, Text>> outputConverter =
+ new SerializableFunction<T, KV<NullWritable, Text>>() {
+ @Override
+ public KV<NullWritable, Text> apply(T input) {
+ return KV.of(NullWritable.get(), new Text(input.toString()));
+ }
+ };
+ return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter);
+ }
+
+ /**
+ * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration
+ * object is altered to enable Avro output.
+ */
+ public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
+ final AvroCoder<T> coder,
+ Configuration conf) {
+ SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter =
+ new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() {
+ @Override
+ public KV<AvroKey<T>, NullWritable> apply(T input) {
+ return KV.of(new AvroKey<>(input), NullWritable.get());
+ }
+ };
+ conf.set("avro.schema.output.key", coder.getSchema().toString());
+ return to(
+ path,
+ AvroKeyOutputFormat.class,
+ (Class<AvroKey<T>>) (Class<?>) AvroKey.class,
+ NullWritable.class,
+ outputConverter).withConfiguration(conf);
+ }
+
+ /**
+ * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration
+ * object is altered to enable Avro output.
+ */
+ public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, NullWritable>
+ toAvro(String path, Schema schema, Configuration conf) {
+ return toAvro(path, AvroCoder.of(schema), conf);
+ }
+
+ /**
+ * Helper to create Avro sink given {@link Class}. Keep in mind that configuration
+ * object is altered to enable Avro output.
+ */
+ public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
+ Class<T> cls,
+ Configuration conf) {
+ return toAvro(path, AvroCoder.of(cls), conf);
+ }
+
+ // =======================================================================
+ // Builder methods
+ // =======================================================================
+
+ public abstract Builder<T, K, V> toBuilder();
+ public static <T, K, V> Builder builder() {
+ return new AutoValue_HDFSFileSink.Builder<>();
+ }
+
+ /**
+ * AutoValue builder for {@link HDFSFileSink}.
+ */
+ @AutoValue.Builder
+ public abstract static class Builder<T, K, V> {
+ public abstract Builder<T, K, V> setPath(String path);
+ public abstract Builder<T, K, V> setFormatClass(
+ Class<? extends FileOutputFormat<K, V>> formatClass);
+ public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass);
+ public abstract Builder<T, K, V> setValueClass(Class<V> valueClass);
+ public abstract Builder<T, K, V> setOutputConverter(
+ SerializableFunction<T, KV<K, V>> outputConverter);
+ public abstract Builder<T, K, V> setSerializableConfiguration(
+ SerializableConfiguration serializableConfiguration);
+ public Builder<T, K, V> setConfiguration(@Nullable Configuration configuration) {
+ if (configuration == null) {
+ configuration = new Configuration(false);
+ }
+ return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
+ }
+ public abstract Builder<T, K, V> setUsername(String username);
+ public abstract Builder<T, K, V> setValidate(boolean validate);
+ public abstract HDFSFileSink<T, K, V> build();
+ }
+
+ public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration configuration) {
+ return this.toBuilder().setConfiguration(configuration).build();
+ }
+
+ public HDFSFileSink<T, K, V> withUsername(@Nullable String username) {
+ return this.toBuilder().setUsername(username).build();
+ }
+
+ // =======================================================================
+ // Sink
+ // =======================================================================
+
+ @Override
+ public void validate(PipelineOptions options) {
+ if (validate()) {
+ try {
+ UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ FileSystem fs = FileSystem.get(new URI(path()),
+ SerializableConfiguration.newConfiguration(serializableConfiguration()));
+ checkState(!fs.exists(new Path(path())), "Output path %s already exists", path());
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Sink.WriteOperation<T, String> createWriteOperation() {
+ return new HDFSWriteOperation<>(this, path(), formatClass());
+ }
+
+ private Job newJob() throws IOException {
+ Job job = SerializableConfiguration.newJob(serializableConfiguration());
+ job.setJobID(jobId);
+ job.setOutputKeyClass(keyClass());
+ job.setOutputValueClass(valueClass());
+ return job;
+ }
+
+ // =======================================================================
+ // WriteOperation
+ // =======================================================================
+
+ /** {{@link WriteOperation}} for HDFS. */
+ private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, String> {
+
+ private final HDFSFileSink<T, K, V> sink;
+ private final String path;
+ private final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+ HDFSWriteOperation(HDFSFileSink<T, K, V> sink,
+ String path,
+ Class<? extends FileOutputFormat<K, V>> formatClass) {
+ this.sink = sink;
+ this.path = path;
+ this.formatClass = formatClass;
+ }
+
+ @Override
+ public void initialize(PipelineOptions options) throws Exception {
+ Job job = sink.newJob();
+ FileOutputFormat.setOutputPath(job, new Path(path));
+ }
+
+ @Override
+ public void setWindowedWrites(boolean windowedWrites) {
+ }
+
+ @Override
+ public void finalize(final Iterable<String> writerResults, PipelineOptions options)
+ throws Exception {
+ UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ doFinalize(writerResults);
+ return null;
+ }
+ });
+ }
+
+ private void doFinalize(Iterable<String> writerResults) throws Exception {
+ Job job = sink.newJob();
+ FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
+ // If there are 0 output shards, just create output folder.
+ if (!writerResults.iterator().hasNext()) {
+ fs.mkdirs(new Path(path));
+ return;
+ }
+
+ // job successful
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+ FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context);
+ outputCommitter.commitJob(context);
+
+ // get actual output shards
+ Set<String> actual = Sets.newHashSet();
+ FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ });
+
+ // get expected output shards
+ Set<String> expected = Sets.newHashSet(writerResults);
+ checkState(
+ expected.size() == Lists.newArrayList(writerResults).size(),
+ "Data loss due to writer results hash collision");
+ for (FileStatus s : statuses) {
+ String name = s.getPath().getName();
+ int pos = name.indexOf('.');
+ actual.add(pos > 0 ? name.substring(0, pos) : name);
+ }
+
+ checkState(actual.equals(expected), "Writer results and output files do not match");
+
+ // rename output shards to Hadoop style, i.e. part-r-00000.txt
+ int i = 0;
+ for (FileStatus s : statuses) {
+ String name = s.getPath().getName();
+ int pos = name.indexOf('.');
+ String ext = pos > 0 ? name.substring(pos) : "";
+ fs.rename(
+ s.getPath(),
+ new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext)));
+ i++;
+ }
+ }
+
+ @Override
+ public Writer<T, String> createWriter(PipelineOptions options) throws Exception {
+ return new HDFSWriter<>(this, path, formatClass);
+ }
+
+ @Override
+ public Sink<T> getSink() {
+ return sink;
+ }
+
+ @Override
+ public Coder<String> getWriterResultCoder() {
+ return StringUtf8Coder.of();
+ }
+
+ }
+
+ // =======================================================================
+ // Writer
+ // =======================================================================
+
+ private static class HDFSWriter<T, K, V> extends Writer<T, String> {
+
+ private final HDFSWriteOperation<T, K, V> writeOperation;
+ private final String path;
+ private final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+ // unique hash for each task
+ private int hash;
+
+ private TaskAttemptContext context;
+ private RecordWriter<K, V> recordWriter;
+ private FileOutputCommitter outputCommitter;
+
+ HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation,
+ String path,
+ Class<? extends FileOutputFormat<K, V>> formatClass) {
+ this.writeOperation = writeOperation;
+ this.path = path;
+ this.formatClass = formatClass;
+ }
+
+ @Override
+ public void openWindowed(final String uId,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shard,
+ int numShards) throws Exception {
+ throw new UnsupportedOperationException("Windowing support not implemented yet for"
+ + "HDFS. Window " + window);
+ }
+
+ @Override
+ public void openUnwindowed(final String uId, int shard, int numShards) throws Exception {
+ UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ doOpen(uId);
+ return null;
+ }
+ }
+ );
+ }
+
+ private void doOpen(String uId) throws Exception {
+ this.hash = uId.hashCode();
+
+ Job job = writeOperation.sink.newJob();
+ FileOutputFormat.setOutputPath(job, new Path(path));
+
+ // Each Writer is responsible for writing one bundle of elements and is represented by one
+ // unique Hadoop task based on uId/hash. All tasks share the same job ID.
+ JobID jobId = job.getJobID();
+ TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
+ context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
+
+ FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
+ recordWriter = outputFormat.getRecordWriter(context);
+ outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
+ }
+
+ @Override
+ public void write(T value) throws Exception {
+ checkNotNull(recordWriter,
+ "Record writer can't be null. Make sure to open Writer first!");
+ KV<K, V> kv = writeOperation.sink.outputConverter().apply(value);
+ recordWriter.write(kv.getKey(), kv.getValue());
+ }
+
+ @Override
+ public void cleanup() throws Exception {
+
+ }
+
+ @Override
+ public String close() throws Exception {
+ return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
+ new PrivilegedExceptionAction<String>() {
+ @Override
+ public String run() throws Exception {
+ return doClose();
+ }
+ });
+ }
+
+ private String doClose() throws Exception {
+ // task/attempt successful
+ recordWriter.close(context);
+ outputCommitter.commitTask(context);
+
+ // result is prefix of the output file name
+ return String.format("part-r-%d", hash);
+ }
+
+ @Override
+ public WriteOperation<T, String> getWriteOperation() {
+ return writeOperation;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
new file mode 100644
index 0000000..5cc2097
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a
+ * Hadoop file-based input format.
+ *
+ * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
+ * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
+ * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to
+ * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
+ * key class and the value class.
+ *
+ * <p>A {@code HDFSFileSource} can be read from using the
+ * {@link org.apache.beam.sdk.io.Read} transform. For example:
+ *
+ * <pre>
+ * {@code
+ * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
+ * MyKey.class, MyValue.class);
+ * PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
+ * }
+ * </pre>
+ *
+ * <p>Implementation note: Since Hadoop's
+ * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
+ * determines the input splits, this class extends {@link BoundedSource} rather than
+ * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
+ * dictates input splits.
+ * @param <T> the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}.
+ * @param <K> the type of keys to be read from the source via {@link FileInputFormat}.
+ * @param <V> the type of values to be read from the source via {@link FileInputFormat}.
+ */
+@AutoValue
+@Experimental
+public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
+ private static final long serialVersionUID = 0L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class);
+
+ public abstract String filepattern();
+ public abstract Class<? extends FileInputFormat<K, V>> formatClass();
+ public abstract Coder<T> coder();
+ public abstract SerializableFunction<KV<K, V>, T> inputConverter();
+ public abstract SerializableConfiguration serializableConfiguration();
+ public @Nullable abstract SerializableSplit serializableSplit();
+ public @Nullable abstract String username();
+ public abstract boolean validateSource();
+
+ // =======================================================================
+ // Factory methods
+ // =======================================================================
+
+ public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, K, V>
+ from(String filepattern,
+ Class<W> formatClass,
+ Coder<T> coder,
+ SerializableFunction<KV<K, V>, T> inputConverter) {
+ return HDFSFileSource.<T, K, V>builder()
+ .setFilepattern(filepattern)
+ .setFormatClass(formatClass)
+ .setCoder(coder)
+ .setInputConverter(inputConverter)
+ .setConfiguration(null)
+ .setUsername(null)
+ .setValidateSource(true)
+ .setSerializableSplit(null)
+ .build();
+ }
+
+ public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, V>, K, V>
+ from(String filepattern,
+ Class<W> formatClass,
+ Class<K> keyClass,
+ Class<V> valueClass) {
+ KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
+ SerializableFunction<KV<K, V>, KV<K, V>> inputConverter =
+ new SerializableFunction<KV<K, V>, KV<K, V>>() {
+ @Override
+ public KV<K, V> apply(KV<K, V> input) {
+ return input;
+ }
+ };
+ return HDFSFileSource.<KV<K, V>, K, V>builder()
+ .setFilepattern(filepattern)
+ .setFormatClass(formatClass)
+ .setCoder(coder)
+ .setInputConverter(inputConverter)
+ .setConfiguration(null)
+ .setUsername(null)
+ .setValidateSource(true)
+ .setSerializableSplit(null)
+ .build();
+ }
+
+ public static HDFSFileSource<String, LongWritable, Text>
+ fromText(String filepattern) {
+ SerializableFunction<KV<LongWritable, Text>, String> inputConverter =
+ new SerializableFunction<KV<LongWritable, Text>, String>() {
+ @Override
+ public String apply(KV<LongWritable, Text> input) {
+ return input.getValue().toString();
+ }
+ };
+ return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter);
+ }
+
+ /**
+ * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration
+ * object is altered to enable Avro input.
+ */
+ public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
+ fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) {
+ Class<AvroKeyInputFormat<T>> formatClass = castClass(AvroKeyInputFormat.class);
+ SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter =
+ new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() {
+ @Override
+ public T apply(KV<AvroKey<T>, NullWritable> input) {
+ try {
+ return CoderUtils.clone(coder, input.getKey().datum());
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ conf.set("avro.schema.input.key", coder.getSchema().toString());
+ return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf);
+ }
+
+ /**
+ * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration
+ * object is altered to enable Avro input.
+ */
+ public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, NullWritable>
+ fromAvro(String filepattern, Schema schema, Configuration conf) {
+ return fromAvro(filepattern, AvroCoder.of(schema), conf);
+ }
+
+ /**
+ * Helper to read from Avro source given {@link Class}. Keep in mind that configuration
+ * object is altered to enable Avro input.
+ */
+ public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
+ fromAvro(String filepattern, Class<T> cls, Configuration conf) {
+ return fromAvro(filepattern, AvroCoder.of(cls), conf);
+ }
+
+ // =======================================================================
+ // Builder methods
+ // =======================================================================
+
+ public abstract HDFSFileSource.Builder<T, K, V> toBuilder();
+ public static <T, K, V> HDFSFileSource.Builder builder() {
+ return new AutoValue_HDFSFileSource.Builder<>();
+ }
+
+ /**
+ * AutoValue builder for {@link HDFSFileSource}.
+ */
+ @AutoValue.Builder
+ public abstract static class Builder<T, K, V> {
+ public abstract Builder<T, K, V> setFilepattern(String filepattern);
+ public abstract Builder<T, K, V> setFormatClass(
+ Class<? extends FileInputFormat<K, V>> formatClass);
+ public abstract Builder<T, K, V> setCoder(Coder<T> coder);
+ public abstract Builder<T, K, V> setInputConverter(
+ SerializableFunction<KV<K, V>, T> inputConverter);
+ public abstract Builder<T, K, V> setSerializableConfiguration(
+ SerializableConfiguration serializableConfiguration);
+ public Builder<T, K, V> setConfiguration(Configuration configuration) {
+ if (configuration == null) {
+ configuration = new Configuration(false);
+ }
+ return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
+ }
+ public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit serializableSplit);
+ public abstract Builder<T, K, V> setUsername(@Nullable String username);
+ public abstract Builder<T, K, V> setValidateSource(boolean validate);
+ public abstract HDFSFileSource<T, K, V> build();
+ }
+
+ public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration configuration) {
+ return this.toBuilder().setConfiguration(configuration).build();
+ }
+
+ public HDFSFileSource<T, K, V> withUsername(@Nullable String username) {
+ return this.toBuilder().setUsername(username).build();
+ }
+
+ // =======================================================================
+ // BoundedSource
+ // =======================================================================
+
+ @Override
+ public List<? extends BoundedSource<T>> split(
+ final long desiredBundleSizeBytes,
+ PipelineOptions options) throws Exception {
+ if (serializableSplit() == null) {
+ List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs(
+ new PrivilegedExceptionAction<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> run() throws Exception {
+ return computeSplits(desiredBundleSizeBytes, serializableConfiguration());
+ }
+ });
+ return Lists.transform(inputSplits,
+ new Function<InputSplit, BoundedSource<T>>() {
+ @Override
+ public BoundedSource<T> apply(@Nullable InputSplit inputSplit) {
+ SerializableSplit serializableSplit = new SerializableSplit(inputSplit);
+ return HDFSFileSource.this.toBuilder()
+ .setSerializableSplit(serializableSplit)
+ .build();
+ }
+ });
+ } else {
+ return ImmutableList.of(this);
+ }
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) {
+ long size = 0;
+
+ try {
+ // If this source represents a split from split,
+ // then return the size of the split, rather then the entire input
+ if (serializableSplit() != null) {
+ return serializableSplit().getSplit().getLength();
+ }
+
+ size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Long>() {
+ @Override
+ public Long run() throws Exception {
+ long size = 0;
+ Job job = SerializableConfiguration.newJob(serializableConfiguration());
+ for (FileStatus st : listStatus(createFormat(job), job)) {
+ size += st.getLen();
+ }
+ return size;
+ }
+ });
+ } catch (IOException e) {
+ LOG.warn(
+ "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
+ // ignore, and return 0
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn(
+ "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
+ // ignore, and return 0
+ }
+ return size;
+ }
+
+ @Override
+ public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
+ this.validate();
+ return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit());
+ }
+
+ @Override
+ public void validate() {
+ if (validateSource()) {
+ try {
+ UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ final Path pathPattern = new Path(filepattern());
+ FileSystem fs = FileSystem.get(pathPattern.toUri(),
+ SerializableConfiguration.newConfiguration(serializableConfiguration()));
+ FileStatus[] fileStatuses = fs.globStatus(pathPattern);
+ checkState(
+ fileStatuses != null && fileStatuses.length > 0,
+ "Unable to find any files matching %s", filepattern());
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return coder();
+ }
+
+ // =======================================================================
+ // Helpers
+ // =======================================================================
+
+ private List<InputSplit> computeSplits(long desiredBundleSizeBytes,
+ SerializableConfiguration serializableConfiguration)
+ throws IOException, IllegalAccessException, InstantiationException {
+ Job job = SerializableConfiguration.newJob(serializableConfiguration);
+ FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
+ FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
+ return createFormat(job).getSplits(job);
+ }
+
+ private FileInputFormat<K, V> createFormat(Job job)
+ throws IOException, IllegalAccessException, InstantiationException {
+ Path path = new Path(filepattern());
+ FileInputFormat.addInputPath(job, path);
+ return formatClass().newInstance();
+ }
+
+ private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job)
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ // FileInputFormat#listStatus is protected, so call using reflection
+ Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
+ listStatus.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job);
+ return stat;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Coder<T> getDefaultCoder(Class<T> c) {
+ if (Writable.class.isAssignableFrom(c)) {
+ Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
+ return (Coder<T>) WritableCoder.of(writableClass);
+ } else if (Void.class.equals(c)) {
+ return (Coder<T>) VoidCoder.of();
+ }
+ // TODO: how to use registered coders here?
+ throw new IllegalStateException("Cannot find coder for " + c);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Class<T> castClass(Class<?> aClass) {
+ return (Class<T>) aClass;
+ }
+
+ // =======================================================================
+ // BoundedReader
+ // =======================================================================
+
+ private static class HDFSFileReader<T, K, V> extends BoundedSource.BoundedReader<T> {
+
+ private final HDFSFileSource<T, K, V> source;
+ private final String filepattern;
+ private final Class<? extends FileInputFormat<K, V>> formatClass;
+ private final Job job;
+
+ private List<InputSplit> splits;
+ private ListIterator<InputSplit> splitsIterator;
+
+ private Configuration conf;
+ private FileInputFormat<?, ?> format;
+ private TaskAttemptContext attemptContext;
+ private RecordReader<K, V> currentReader;
+ private KV<K, V> currentPair;
+
+ HDFSFileReader(
+ HDFSFileSource<T, K, V> source,
+ String filepattern,
+ Class<? extends FileInputFormat<K, V>> formatClass,
+ SerializableSplit serializableSplit)
+ throws IOException {
+ this.source = source;
+ this.filepattern = filepattern;
+ this.formatClass = formatClass;
+ this.job = SerializableConfiguration.newJob(source.serializableConfiguration());
+
+ if (serializableSplit != null) {
+ this.splits = ImmutableList.of(serializableSplit.getSplit());
+ this.splitsIterator = splits.listIterator();
+ }
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ Path path = new Path(filepattern);
+ FileInputFormat.addInputPath(job, path);
+
+ conf = job.getConfiguration();
+ try {
+ format = formatClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IOException("Cannot instantiate file input format " + formatClass, e);
+ }
+ attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+
+ if (splitsIterator == null) {
+ splits = format.getSplits(job);
+ splitsIterator = splits.listIterator();
+ }
+
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ try {
+ if (currentReader != null && currentReader.nextKeyValue()) {
+ currentPair = nextPair();
+ return true;
+ } else {
+ while (splitsIterator.hasNext()) {
+ // advance the reader and see if it has records
+ final InputSplit nextSplit = splitsIterator.next();
+ @SuppressWarnings("unchecked")
+ RecordReader<K, V> reader =
+ (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
+ if (currentReader != null) {
+ currentReader.close();
+ }
+ currentReader = reader;
+ UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ currentReader.initialize(nextSplit, attemptContext);
+ return null;
+ }
+ });
+ if (currentReader.nextKeyValue()) {
+ currentPair = nextPair();
+ return true;
+ }
+ currentReader.close();
+ currentReader = null;
+ }
+ // either no next split or all readers were empty
+ currentPair = null;
+ return false;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (currentPair == null) {
+ throw new NoSuchElementException();
+ }
+ return source.inputConverter().apply(currentPair);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ currentPair = null;
+ }
+
+ @Override
+ public BoundedSource<T> getCurrentSource() {
+ return source;
+ }
+
+ @SuppressWarnings("unchecked")
+ private KV<K, V> nextPair() throws IOException, InterruptedException {
+ K key = currentReader.getCurrentKey();
+ V value = currentReader.getCurrentValue();
+ // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
+ if (key instanceof Writable) {
+ key = (K) WritableUtils.clone((Writable) key, conf);
+ }
+ if (value instanceof Writable) {
+ value = (V) WritableUtils.clone((Writable) value, conf);
+ }
+ return KV.of(key, value);
+ }
+
+ // =======================================================================
+ // Optional overrides
+ // =======================================================================
+
+ @Override
+ public Double getFractionConsumed() {
+ if (currentReader == null) {
+ return 0.0;
+ }
+ if (splits.isEmpty()) {
+ return 1.0;
+ }
+ int index = splitsIterator.previousIndex();
+ int numReaders = splits.size();
+ if (index == numReaders) {
+ return 1.0;
+ }
+ double before = 1.0 * index / numReaders;
+ double after = 1.0 * (index + 1) / numReaders;
+ Double fractionOfCurrentReader = getProgress();
+ if (fractionOfCurrentReader == null) {
+ return before;
+ }
+ return before + fractionOfCurrentReader * (after - before);
+ }
+
+ private Double getProgress() {
+ try {
+ return (double) currentReader.getProgress();
+ } catch (IOException | InterruptedException e) {
+ return null;
+ }
+ }
+
+ }
+
+ // =======================================================================
+ // SerializableSplit
+ // =======================================================================
+
+ /**
+ * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
+ * serialized using Java's standard serialization mechanisms. Note that the InputSplit
+ * has to be Writable (which most are).
+ */
+ protected static class SerializableSplit implements Externalizable {
+ private static final long serialVersionUID = 0L;
+
+ private InputSplit split;
+
+ public SerializableSplit() {
+ }
+
+ public SerializableSplit(InputSplit split) {
+ checkArgument(split instanceof Writable, "Split is not writable: %s", split);
+ this.split = split;
+ }
+
+ public InputSplit getSplit() {
+ return split;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(split.getClass().getCanonicalName());
+ ((Writable) split).write(out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ String className = in.readUTF();
+ try {
+ split = (InputSplit) Class.forName(className).newInstance();
+ ((Writable) split).readFields(in);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
new file mode 100644
index 0000000..154a818
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as
+ * Apache Beam {@link FileSystem FileSystems}.
+ *
+ * <p>The following HDFS FileSystem(s) are known to be unsupported:
+ * <ul>
+ * <li>FTPFileSystem: Missing seek support within FTPInputStream</li>
+ * </ul>
+ *
+ * <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is seek
+ * efficient when reading. The source code for the following {@link FSInputStream} implementations
+ * (as of Hadoop 2.7.1) do provide seek implementations:
+ * <ul>
+ * <li>HarFsInputStream</li>
+ * <li>S3InputStream</li>
+ * <li>DFSInputStream</li>
+ * <li>SwiftNativeInputStream</li>
+ * <li>NativeS3FsInputStream</li>
+ * <li>LocalFSFileInputStream</li>
+ * <li>NativeAzureFsInputStream</li>
+ * <li>S3AInputStream</li>
+ * </ul>
+ */
+class HadoopFileSystem extends FileSystem<HadoopResourceId> {
+ @VisibleForTesting
+ final org.apache.hadoop.fs.FileSystem fileSystem;
+
+ HadoopFileSystem(Configuration configuration) throws IOException {
+ this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration);
+ }
+
+ @Override
+ protected List<MatchResult> match(List<String> specs) {
+ ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
+ for (String spec : specs) {
+ try {
+ FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec));
+ List<Metadata> metadata = new ArrayList<>();
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isFile()) {
+ metadata.add(Metadata.builder()
+ .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri()))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes(fileStatus.getLen())
+ .build());
+ }
+ }
+ resultsBuilder.add(MatchResult.create(Status.OK, metadata));
+ } catch (IOException e) {
+ resultsBuilder.add(MatchResult.create(Status.ERROR, e));
+ }
+ }
+ return resultsBuilder.build();
+ }
+
+ @Override
+ protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)
+ throws IOException {
+ return Channels.newChannel(fileSystem.create(resourceId.toPath()));
+ }
+
+ @Override
+ protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException {
+ FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath());
+ return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath()));
+ }
+
+ @Override
+ protected void copy(
+ List<HadoopResourceId> srcResourceIds,
+ List<HadoopResourceId> destResourceIds) throws IOException {
+ for (int i = 0; i < srcResourceIds.size(); ++i) {
+ // Unfortunately HDFS FileSystems don't support a native copy operation so we are forced
+ // to use the inefficient implementation found in FileUtil which copies all the bytes through
+ // the local machine.
+ //
+ // HDFS FileSystem does define a concat method but could only find the DFSFileSystem
+ // implementing it. The DFSFileSystem implemented concat by deleting the srcs after which
+ // is not what we want. Also, all the other FileSystem implementations I saw threw
+ // UnsupportedOperationException within concat.
+ FileUtil.copy(
+ fileSystem, srcResourceIds.get(i).toPath(),
+ fileSystem, destResourceIds.get(i).toPath(),
+ false,
+ true,
+ fileSystem.getConf());
+ }
+ }
+
+ @Override
+ protected void rename(
+ List<HadoopResourceId> srcResourceIds,
+ List<HadoopResourceId> destResourceIds) throws IOException {
+ for (int i = 0; i < srcResourceIds.size(); ++i) {
+ fileSystem.rename(
+ srcResourceIds.get(i).toPath(),
+ destResourceIds.get(i).toPath());
+ }
+ }
+
+ @Override
+ protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
+ for (HadoopResourceId resourceId : resourceIds) {
+ fileSystem.delete(resourceId.toPath(), false);
+ }
+ }
+
+ @Override
+ protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+ try {
+ if (singleResourceSpec.endsWith("/") && !isDirectory) {
+ throw new IllegalArgumentException(String.format(
+ "Expected file path but received directory path %s", singleResourceSpec));
+ }
+ return !singleResourceSpec.endsWith("/") && isDirectory
+ ? new HadoopResourceId(new URI(singleResourceSpec + "/"))
+ : new HadoopResourceId(new URI(singleResourceSpec));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(
+ String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory),
+ e);
+ }
+ }
+
+ @Override
+ protected String getScheme() {
+ return fileSystem.getScheme();
+ }
+
+ /** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}. */
+ private static class HadoopSeekableByteChannel implements SeekableByteChannel {
+ private final FileStatus fileStatus;
+ private final FSDataInputStream inputStream;
+ private boolean closed;
+
+ private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream) {
+ this.fileStatus = fileStatus;
+ this.inputStream = inputStream;
+ this.closed = false;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ if (closed) {
+ throw new IOException("Channel is closed");
+ }
+ return inputStream.read(dst);
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long position() throws IOException {
+ if (closed) {
+ throw new IOException("Channel is closed");
+ }
+ return inputStream.getPos();
+ }
+
+ @Override
+ public SeekableByteChannel position(long newPosition) throws IOException {
+ if (closed) {
+ throw new IOException("Channel is closed");
+ }
+ inputStream.seek(newPosition);
+ return this;
+ }
+
+ @Override
+ public long size() throws IOException {
+ if (closed) {
+ throw new IOException("Channel is closed");
+ }
+ return fileStatus.getLen();
+ }
+
+ @Override
+ public SeekableByteChannel truncate(long size) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !closed;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ inputStream.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
new file mode 100644
index 0000000..2cb9d8a
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer}
+ * for a Hadoop {@link Configuration}. The serialized representation is that of a JSON map.
+ *
+ * <p>Note that the serialization of the Hadoop {@link Configuration} only keeps the keys and their
+ * values dropping any configuration hierarchy and source information.
+ */
+@AutoService(Module.class)
+public class HadoopFileSystemModule extends SimpleModule {
+ public HadoopFileSystemModule() {
+ super("HadoopFileSystemModule");
+ setMixInAnnotation(Configuration.class, ConfigurationMixin.class);
+ }
+
+ /** A mixin class to add Jackson annotations to the Hadoop {@link Configuration} class. */
+ @JsonDeserialize(using = ConfigurationDeserializer.class)
+ @JsonSerialize(using = ConfigurationSerializer.class)
+ private static class ConfigurationMixin {}
+
+ /** A Jackson {@link JsonDeserializer} for Hadoop {@link Configuration} objects. */
+ static class ConfigurationDeserializer extends JsonDeserializer<Configuration> {
+ @Override
+ public Configuration deserialize(JsonParser jsonParser,
+ DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+ Map<String, String> rawConfiguration =
+ jsonParser.readValueAs(new TypeReference<Map<String, String>>() {});
+ Configuration configuration = new Configuration(false);
+ for (Map.Entry<String, String> entry : rawConfiguration.entrySet()) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+ return configuration;
+ }
+ }
+
+ /** A Jackson {@link JsonSerializer} for Hadoop {@link Configuration} objects. */
+ static class ConfigurationSerializer extends JsonSerializer<Configuration> {
+ @Override
+ public void serialize(Configuration configuration, JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+ Map<String, String> map = new TreeMap<>();
+ for (Map.Entry<String, String> entry : configuration) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ jsonGenerator.writeObject(map);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
new file mode 100644
index 0000000..31250bc
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.util.List;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
+ * for the {@link HadoopFileSystem}.
+ */
+public interface HadoopFileSystemOptions extends PipelineOptions {
+ @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
+ + "To specify on the command-line, represent the value as a JSON list of JSON maps, where "
+ + "each map represents the entire configuration for a single Hadoop filesystem. For example "
+ + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...},"
+ + "{\"fs.default.name\": \"s3a://\", ...},...]'")
+ @Default.InstanceFactory(ConfigurationLocator.class)
+ List<Configuration> getHdfsConfiguration();
+ void setHdfsConfiguration(List<Configuration> value);
+
+ /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */
+ class ConfigurationLocator implements DefaultValueFactory<Configuration> {
+ @Override
+ public Configuration create(PipelineOptions options) {
+ // TODO: Find default configuration to use
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
new file mode 100644
index 0000000..344623b
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * {@link AutoService} registrar for {@link HadoopFileSystemOptions}.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class HadoopFileSystemOptionsRegistrar implements PipelineOptionsRegistrar {
+
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(HadoopFileSystemOptions.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
new file mode 100644
index 0000000..9159df3
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * {@link AutoService} registrar for the {@link HadoopFileSystem}.
+ */
+@AutoService(FileSystemRegistrar.class)
+public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
+
+ @Override
+ public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+ List<Configuration> configurations =
+ options.as(HadoopFileSystemOptions.class).getHdfsConfiguration();
+ if (configurations == null) {
+ configurations = Collections.emptyList();
+ }
+ checkArgument(configurations.size() <= 1,
+ String.format(
+ "The %s currently only supports at most a single Hadoop configuration.",
+ HadoopFileSystemRegistrar.class.getSimpleName()));
+
+ ImmutableList.Builder<FileSystem> builder = ImmutableList.builder();
+ for (Configuration configuration : configurations) {
+ try {
+ builder.add(new HadoopFileSystem(configuration));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format(
+ "Failed to construct Hadoop filesystem with configuration %s", configuration), e);
+ }
+ }
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
new file mode 100644
index 0000000..e570864
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.net.URI;
+import java.util.Objects;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link ResourceId} implementation for the {@link HadoopFileSystem}.
+ */
+class HadoopResourceId implements ResourceId {
+ private final URI uri;
+
+ HadoopResourceId(URI uri) {
+ this.uri = uri;
+ }
+
+ @Override
+ public ResourceId resolve(String other, ResolveOptions resolveOptions) {
+ return new HadoopResourceId(uri.resolve(other));
+ }
+
+ @Override
+ public ResourceId getCurrentDirectory() {
+ return new HadoopResourceId(uri.getPath().endsWith("/") ? uri : uri.resolve("."));
+ }
+
+ public boolean isDirectory() {
+ return uri.getPath().endsWith("/");
+ }
+
+ @Override
+ public String getFilename() {
+ return new Path(uri).getName();
+ }
+
+ @Override
+ public String getScheme() {
+ return uri.getScheme();
+ }
+
+ @Override
+ public String toString() {
+ return uri.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof HadoopResourceId)) {
+ return false;
+ }
+ return Objects.equals(uri, ((HadoopResourceId) obj).uri);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(uri);
+ }
+
+ Path toPath() {
+ return new Path(uri);
+ }
+}