You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 19:53:29 UTC

[04/50] [abbrv] beam git commit: [BEAM-2135] Move hdfs to hadoop-file-system

[BEAM-2135] Move hdfs to hadoop-file-system


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bacd33c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bacd33c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bacd33c8

Branch: refs/heads/gearpump-runner
Commit: bacd33c81d99f4a3d1b11eb391a7f790087fc2a1
Parents: 3161904
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 2 09:20:49 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 2 12:57:44 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/hadoop-file-system/README.md       |  43 ++
 sdks/java/io/hadoop-file-system/pom.xml         | 195 ++++++
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   | 478 ++++++++++++++
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 +++++++++++++++++++
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      | 240 +++++++
 .../sdk/io/hdfs/HadoopFileSystemModule.java     |  84 +++
 .../sdk/io/hdfs/HadoopFileSystemOptions.java    |  49 ++
 .../hdfs/HadoopFileSystemOptionsRegistrar.java  |  35 ++
 .../sdk/io/hdfs/HadoopFileSystemRegistrar.java  |  62 ++
 .../beam/sdk/io/hdfs/HadoopResourceId.java      |  81 +++
 .../java/org/apache/beam/sdk/io/hdfs/Sink.java  | 195 ++++++
 .../org/apache/beam/sdk/io/hdfs/UGIHelper.java  |  38 ++
 .../java/org/apache/beam/sdk/io/hdfs/Write.java | 585 +++++++++++++++++
 .../apache/beam/sdk/io/hdfs/package-info.java   |  22 +
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      | 172 +++++
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    | 231 +++++++
 .../sdk/io/hdfs/HadoopFileSystemModuleTest.java |  65 ++
 .../HadoopFileSystemOptionsRegistrarTest.java   |  49 ++
 .../io/hdfs/HadoopFileSystemOptionsTest.java    |  48 ++
 .../io/hdfs/HadoopFileSystemRegistrarTest.java  |  81 +++
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java  | 247 ++++++++
 sdks/java/io/hdfs/README.md                     |  43 --
 sdks/java/io/hdfs/pom.xml                       | 195 ------
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   | 478 --------------
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 -------------------
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      | 240 -------
 .../sdk/io/hdfs/HadoopFileSystemModule.java     |  84 ---
 .../sdk/io/hdfs/HadoopFileSystemOptions.java    |  49 --
 .../hdfs/HadoopFileSystemOptionsRegistrar.java  |  35 --
 .../sdk/io/hdfs/HadoopFileSystemRegistrar.java  |  62 --
 .../beam/sdk/io/hdfs/HadoopResourceId.java      |  81 ---
 .../java/org/apache/beam/sdk/io/hdfs/Sink.java  | 195 ------
 .../org/apache/beam/sdk/io/hdfs/UGIHelper.java  |  38 --
 .../java/org/apache/beam/sdk/io/hdfs/Write.java | 585 -----------------
 .../apache/beam/sdk/io/hdfs/package-info.java   |  22 -
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      | 172 -----
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    | 231 -------
 .../sdk/io/hdfs/HadoopFileSystemModuleTest.java |  65 --
 .../HadoopFileSystemOptionsRegistrarTest.java   |  49 --
 .../io/hdfs/HadoopFileSystemOptionsTest.java    |  48 --
 .../io/hdfs/HadoopFileSystemRegistrarTest.java  |  81 ---
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java  | 247 --------
 sdks/java/io/pom.xml                            |   2 +-
 43 files changed, 3626 insertions(+), 3626 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/README.md b/sdks/java/io/hadoop-file-system/README.md
new file mode 100644
index 0000000..3a734f2
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/README.md
@@ -0,0 +1,43 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# HDFS IO
+
+This library provides HDFS sources and sinks to make it possible to read and
+write Apache Hadoop file formats from Apache Beam pipelines.
+
+Currently, only the read path is implemented. A `HDFSFileSource` allows any
+Hadoop `FileInputFormat` to be read as a `PCollection`.
+
+A `HDFSFileSource` can be read from using the
+`org.apache.beam.sdk.io.Read` transform. For example:
+
+```java
+HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
+  MyKey.class, MyValue.class);
+PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
+```
+
+Alternatively, the `readFrom` method is a convenience method that returns a read
+transform. For example:
+
+```java
+PCollection<KV<MyKey, MyValue>> records = pipeline.apply(HDFSFileSource.readFrom(path,
+  MyInputFormat.class, MyKey.class, MyValue.class));
+```

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
new file mode 100644
index 0000000..3ec9848
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop File System</name>
+  <description>Library to read and write Hadoop/HDFS file formats from Beam.</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <beamUseDummyRunner>false</beamUseDummyRunner>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <properties>
+    <!--
+      This is the version of Hadoop used to compile the hadoop-common module.
+      This dependency is defined with a provided scope.
+      Users must supply their own Hadoop version at runtime.
+    -->
+    <hadoop.version>2.7.3</hadoop.version>
+  </properties>
+
+  <dependencyManagement>
+    <!--
+       We define dependencies here instead of sdks/java/io because
+       of a version mimatch between this Hadoop version and other
+       Hadoop versions declared in other io submodules.
+    -->
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs</artifactId>
+        <classifier>tests</classifier>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-minicluster</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <version>${avro.version}</version>
+      <classifier>hadoop2</classifier>
+      <exclusions>
+        <!-- exclude old Jetty version of servlet API -->
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
new file mode 100644
index 0000000..aee73c4
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.Random;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based
+ * output
+ * format.
+ *
+ * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop
+ * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported
+ * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the
+ * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K
+ * and V.
+ *
+ * <p>{@code HDFSFileSink} can be used by {@link Write} to create write
+ * transform. See example below.
+ *
+ * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example:
+ *
+ * <pre>
+ * {@code
+ * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, NullWritable> sink =
+ *   HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class));
+ * avroRecordsPCollection.apply(Write.to(sink));
+ * }
+ * </pre>
+ *
+ * @param <T> the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}.
+ * @param <K> the type of keys to be written to the sink via {@link FileOutputFormat}.
+ * @param <V> the type of values to be written to the sink via {@link FileOutputFormat}.
+ */
+@AutoValue
+@Experimental
+public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
+
+  private static final JobID jobId = new JobID(
+      Long.toString(System.currentTimeMillis()),
+      new Random().nextInt(Integer.MAX_VALUE));
+
+  public abstract String path();
+  public abstract Class<? extends FileOutputFormat<K, V>> formatClass();
+  public abstract Class<K> keyClass();
+  public abstract Class<V> valueClass();
+  public abstract SerializableFunction<T, KV<K, V>> outputConverter();
+  public abstract SerializableConfiguration serializableConfiguration();
+  public @Nullable abstract String username();
+  public abstract boolean validate();
+
+  // =======================================================================
+  // Factory methods
+  // =======================================================================
+
+  public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, V>
+  to(String path,
+     Class<W> formatClass,
+     Class<K> keyClass,
+     Class<V> valueClass,
+     SerializableFunction<T, KV<K, V>> outputConverter) {
+    return HDFSFileSink.<T, K, V>builder()
+        .setPath(path)
+        .setFormatClass(formatClass)
+        .setKeyClass(keyClass)
+        .setValueClass(valueClass)
+        .setOutputConverter(outputConverter)
+        .setConfiguration(null)
+        .setUsername(null)
+        .setValidate(true)
+        .build();
+  }
+
+  public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) {
+    SerializableFunction<T, KV<NullWritable, Text>> outputConverter =
+        new SerializableFunction<T, KV<NullWritable, Text>>() {
+          @Override
+          public KV<NullWritable, Text> apply(T input) {
+            return KV.of(NullWritable.get(), new Text(input.toString()));
+          }
+        };
+    return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter);
+  }
+
+  /**
+   * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration
+   * object is altered to enable Avro output.
+   */
+  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
+                                                                     final AvroCoder<T> coder,
+                                                                     Configuration conf) {
+    SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter =
+        new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() {
+          @Override
+          public KV<AvroKey<T>, NullWritable> apply(T input) {
+            return KV.of(new AvroKey<>(input), NullWritable.get());
+          }
+        };
+    conf.set("avro.schema.output.key", coder.getSchema().toString());
+    return to(
+        path,
+        AvroKeyOutputFormat.class,
+        (Class<AvroKey<T>>) (Class<?>) AvroKey.class,
+        NullWritable.class,
+        outputConverter).withConfiguration(conf);
+  }
+
+  /**
+   * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration
+   * object is altered to enable Avro output.
+   */
+  public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, NullWritable>
+  toAvro(String path, Schema schema, Configuration conf) {
+    return toAvro(path, AvroCoder.of(schema), conf);
+  }
+
+  /**
+   * Helper to create Avro sink given {@link Class}. Keep in mind that configuration
+   * object is altered to enable Avro output.
+   */
+  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
+                                                                     Class<T> cls,
+                                                                     Configuration conf) {
+    return toAvro(path, AvroCoder.of(cls), conf);
+  }
+
+  // =======================================================================
+  // Builder methods
+  // =======================================================================
+
+  public abstract Builder<T, K, V> toBuilder();
+  public static <T, K, V> Builder builder() {
+    return new AutoValue_HDFSFileSink.Builder<>();
+  }
+
+  /**
+   * AutoValue builder for {@link HDFSFileSink}.
+   */
+  @AutoValue.Builder
+  public abstract static class Builder<T, K, V> {
+    public abstract Builder<T, K, V> setPath(String path);
+    public abstract Builder<T, K, V> setFormatClass(
+        Class<? extends FileOutputFormat<K, V>> formatClass);
+    public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass);
+    public abstract Builder<T, K, V> setValueClass(Class<V> valueClass);
+    public abstract Builder<T, K, V> setOutputConverter(
+        SerializableFunction<T, KV<K, V>> outputConverter);
+    public abstract Builder<T, K, V> setSerializableConfiguration(
+        SerializableConfiguration serializableConfiguration);
+    public Builder<T, K, V> setConfiguration(@Nullable Configuration configuration) {
+      if (configuration == null) {
+        configuration = new Configuration(false);
+      }
+      return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
+    }
+    public abstract Builder<T, K, V> setUsername(String username);
+    public abstract Builder<T, K, V> setValidate(boolean validate);
+    public abstract HDFSFileSink<T, K, V> build();
+  }
+
+  public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration configuration) {
+    return this.toBuilder().setConfiguration(configuration).build();
+  }
+
+  public HDFSFileSink<T, K, V> withUsername(@Nullable String username) {
+    return this.toBuilder().setUsername(username).build();
+  }
+
+  // =======================================================================
+  // Sink
+  // =======================================================================
+
+  @Override
+  public void validate(PipelineOptions options) {
+    if (validate()) {
+      try {
+        UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            FileSystem fs = FileSystem.get(new URI(path()),
+                SerializableConfiguration.newConfiguration(serializableConfiguration()));
+            checkState(!fs.exists(new Path(path())), "Output path %s already exists", path());
+            return null;
+          }
+        });
+      } catch (IOException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public Sink.WriteOperation<T, String> createWriteOperation() {
+    return new HDFSWriteOperation<>(this, path(), formatClass());
+  }
+
+  private Job newJob() throws IOException {
+    Job job = SerializableConfiguration.newJob(serializableConfiguration());
+    job.setJobID(jobId);
+    job.setOutputKeyClass(keyClass());
+    job.setOutputValueClass(valueClass());
+    return job;
+  }
+
+  // =======================================================================
+  // WriteOperation
+  // =======================================================================
+
+  /** {{@link WriteOperation}} for HDFS. */
+  private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, String> {
+
+    private final HDFSFileSink<T, K, V> sink;
+    private final String path;
+    private final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+    HDFSWriteOperation(HDFSFileSink<T, K, V> sink,
+                       String path,
+                       Class<? extends FileOutputFormat<K, V>> formatClass) {
+      this.sink = sink;
+      this.path = path;
+      this.formatClass = formatClass;
+    }
+
+    @Override
+    public void initialize(PipelineOptions options) throws Exception {
+      Job job = sink.newJob();
+      FileOutputFormat.setOutputPath(job, new Path(path));
+    }
+
+    @Override
+    public void setWindowedWrites(boolean windowedWrites) {
+    }
+
+    @Override
+    public void finalize(final Iterable<String> writerResults, PipelineOptions options)
+        throws Exception {
+      UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          doFinalize(writerResults);
+          return null;
+        }
+      });
+    }
+
+    private void doFinalize(Iterable<String> writerResults) throws Exception {
+      Job job = sink.newJob();
+      FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
+      // If there are 0 output shards, just create output folder.
+      if (!writerResults.iterator().hasNext()) {
+        fs.mkdirs(new Path(path));
+        return;
+      }
+
+      // job successful
+      JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+      FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context);
+      outputCommitter.commitJob(context);
+
+      // get actual output shards
+      Set<String> actual = Sets.newHashSet();
+      FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          String name = path.getName();
+          return !name.startsWith("_") && !name.startsWith(".");
+        }
+      });
+
+      // get expected output shards
+      Set<String> expected = Sets.newHashSet(writerResults);
+      checkState(
+          expected.size() == Lists.newArrayList(writerResults).size(),
+          "Data loss due to writer results hash collision");
+      for (FileStatus s : statuses) {
+        String name = s.getPath().getName();
+        int pos = name.indexOf('.');
+        actual.add(pos > 0 ? name.substring(0, pos) : name);
+      }
+
+      checkState(actual.equals(expected), "Writer results and output files do not match");
+
+      // rename output shards to Hadoop style, i.e. part-r-00000.txt
+      int i = 0;
+      for (FileStatus s : statuses) {
+        String name = s.getPath().getName();
+        int pos = name.indexOf('.');
+        String ext = pos > 0 ? name.substring(pos) : "";
+        fs.rename(
+            s.getPath(),
+            new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext)));
+        i++;
+      }
+    }
+
+    @Override
+    public Writer<T, String> createWriter(PipelineOptions options) throws Exception {
+      return new HDFSWriter<>(this, path, formatClass);
+    }
+
+    @Override
+    public Sink<T> getSink() {
+      return sink;
+    }
+
+    @Override
+    public Coder<String> getWriterResultCoder() {
+      return StringUtf8Coder.of();
+    }
+
+  }
+
+  // =======================================================================
+  // Writer
+  // =======================================================================
+
+  private static class HDFSWriter<T, K, V> extends Writer<T, String> {
+
+    private final HDFSWriteOperation<T, K, V> writeOperation;
+    private final String path;
+    private final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+    // unique hash for each task
+    private int hash;
+
+    private TaskAttemptContext context;
+    private RecordWriter<K, V> recordWriter;
+    private FileOutputCommitter outputCommitter;
+
+    HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation,
+               String path,
+               Class<? extends FileOutputFormat<K, V>> formatClass) {
+      this.writeOperation = writeOperation;
+      this.path = path;
+      this.formatClass = formatClass;
+    }
+
+    @Override
+    public void openWindowed(final String uId,
+                             BoundedWindow window,
+                             PaneInfo paneInfo,
+                             int shard,
+                             int numShards) throws Exception {
+      throw new UnsupportedOperationException("Windowing support not implemented yet for"
+          + "HDFS. Window " + window);
+    }
+
+    @Override
+    public void openUnwindowed(final String uId, int shard, int numShards) throws Exception {
+      UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
+          new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+              doOpen(uId);
+              return null;
+            }
+          }
+      );
+    }
+
+    private void doOpen(String uId) throws Exception {
+      this.hash = uId.hashCode();
+
+      Job job = writeOperation.sink.newJob();
+      FileOutputFormat.setOutputPath(job, new Path(path));
+
+      // Each Writer is responsible for writing one bundle of elements and is represented by one
+      // unique Hadoop task based on uId/hash. All tasks share the same job ID.
+      JobID jobId = job.getJobID();
+      TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
+      context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
+
+      FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
+      recordWriter = outputFormat.getRecordWriter(context);
+      outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
+    }
+
+    @Override
+    public void write(T value) throws Exception {
+      checkNotNull(recordWriter,
+          "Record writer can't be null. Make sure to open Writer first!");
+      KV<K, V> kv = writeOperation.sink.outputConverter().apply(value);
+      recordWriter.write(kv.getKey(), kv.getValue());
+    }
+
+    @Override
+    public void cleanup() throws Exception {
+
+    }
+
+    @Override
+    public String close() throws Exception {
+      return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
+          new PrivilegedExceptionAction<String>() {
+            @Override
+            public String run() throws Exception {
+              return doClose();
+            }
+          });
+    }
+
+    private String doClose() throws Exception {
+      // task/attempt successful
+      recordWriter.close(context);
+      outputCommitter.commitTask(context);
+
+      // result is prefix of the output file name
+      return String.format("part-r-%d", hash);
+    }
+
+    @Override
+    public WriteOperation<T, String> getWriteOperation() {
+      return writeOperation;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
new file mode 100644
index 0000000..5cc2097
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a
+ * Hadoop file-based input format.
+ *
+ * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
+ * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
+ * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to
+ * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
+ * key class and the value class.
+ *
+ * <p>A {@code HDFSFileSource} can be read from using the
+ * {@link org.apache.beam.sdk.io.Read} transform. For example:
+ *
+ * <pre>
+ * {@code
+ * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
+ *   MyKey.class, MyValue.class);
+ * PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
+ * }
+ * </pre>
+ *
+ * <p>Implementation note: Since Hadoop's
+ * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
+ * determines the input splits, this class extends {@link BoundedSource} rather than
+ * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
+ * dictates input splits.
+ * @param <T> the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}.
+ * @param <K> the type of keys to be read from the source via {@link FileInputFormat}.
+ * @param <V> the type of values to be read from the source via {@link FileInputFormat}.
+ */
+@AutoValue
+@Experimental
+public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
+  private static final long serialVersionUID = 0L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class);
+
+  public abstract String filepattern();
+  public abstract Class<? extends FileInputFormat<K, V>> formatClass();
+  public abstract Coder<T> coder();
+  public abstract SerializableFunction<KV<K, V>, T> inputConverter();
+  public abstract SerializableConfiguration serializableConfiguration();
+  public @Nullable abstract SerializableSplit serializableSplit();
+  public @Nullable abstract String username();
+  public abstract boolean validateSource();
+
+  // =======================================================================
+  // Factory methods
+  // =======================================================================
+
+  public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, K, V>
+  from(String filepattern,
+       Class<W> formatClass,
+       Coder<T> coder,
+       SerializableFunction<KV<K, V>, T> inputConverter) {
+    return HDFSFileSource.<T, K, V>builder()
+        .setFilepattern(filepattern)
+        .setFormatClass(formatClass)
+        .setCoder(coder)
+        .setInputConverter(inputConverter)
+        .setConfiguration(null)
+        .setUsername(null)
+        .setValidateSource(true)
+        .setSerializableSplit(null)
+        .build();
+  }
+
+  public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, V>, K, V>
+  from(String filepattern,
+       Class<W> formatClass,
+       Class<K> keyClass,
+       Class<V> valueClass) {
+    KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
+    SerializableFunction<KV<K, V>, KV<K, V>> inputConverter =
+        new SerializableFunction<KV<K, V>, KV<K, V>>() {
+          @Override
+          public KV<K, V> apply(KV<K, V> input) {
+            return input;
+          }
+        };
+    return HDFSFileSource.<KV<K, V>, K, V>builder()
+        .setFilepattern(filepattern)
+        .setFormatClass(formatClass)
+        .setCoder(coder)
+        .setInputConverter(inputConverter)
+        .setConfiguration(null)
+        .setUsername(null)
+        .setValidateSource(true)
+        .setSerializableSplit(null)
+        .build();
+  }
+
+  public static HDFSFileSource<String, LongWritable, Text>
+  fromText(String filepattern) {
+    SerializableFunction<KV<LongWritable, Text>, String> inputConverter =
+        new SerializableFunction<KV<LongWritable, Text>, String>() {
+      @Override
+      public String apply(KV<LongWritable, Text> input) {
+        return input.getValue().toString();
+      }
+    };
+    return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter);
+  }
+
+  /**
+   * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration
+   * object is altered to enable Avro input.
+   */
+  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
+  fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) {
+    Class<AvroKeyInputFormat<T>> formatClass = castClass(AvroKeyInputFormat.class);
+    SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter =
+        new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() {
+          @Override
+          public T apply(KV<AvroKey<T>, NullWritable> input) {
+            try {
+              return CoderUtils.clone(coder, input.getKey().datum());
+            } catch (CoderException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+    conf.set("avro.schema.input.key", coder.getSchema().toString());
+    return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf);
+  }
+
+  /**
+   * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration
+   * object is altered to enable Avro input.
+   */
+  public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, NullWritable>
+  fromAvro(String filepattern, Schema schema, Configuration conf) {
+    return fromAvro(filepattern, AvroCoder.of(schema), conf);
+  }
+
+  /**
+   * Helper to read from Avro source given {@link Class}. Keep in mind that configuration
+   * object is altered to enable Avro input.
+   */
+  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
+  fromAvro(String filepattern, Class<T> cls, Configuration conf) {
+    return fromAvro(filepattern, AvroCoder.of(cls), conf);
+  }
+
+  // =======================================================================
+  // Builder methods
+  // =======================================================================
+
+  public abstract HDFSFileSource.Builder<T, K, V> toBuilder();
+  public static <T, K, V> HDFSFileSource.Builder builder() {
+    return new AutoValue_HDFSFileSource.Builder<>();
+  }
+
+  /**
+   * AutoValue builder for {@link HDFSFileSource}.
+   */
+  @AutoValue.Builder
+  public abstract static class Builder<T, K, V> {
+    public abstract Builder<T, K, V> setFilepattern(String filepattern);
+    public abstract Builder<T, K, V> setFormatClass(
+        Class<? extends FileInputFormat<K, V>> formatClass);
+    public abstract Builder<T, K, V> setCoder(Coder<T> coder);
+    public abstract Builder<T, K, V> setInputConverter(
+        SerializableFunction<KV<K, V>, T> inputConverter);
+    public abstract Builder<T, K, V> setSerializableConfiguration(
+        SerializableConfiguration serializableConfiguration);
+    public Builder<T, K, V> setConfiguration(Configuration configuration) {
+      if (configuration == null) {
+        configuration = new Configuration(false);
+      }
+      return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
+    }
+    public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit serializableSplit);
+    public abstract Builder<T, K, V> setUsername(@Nullable String username);
+    public abstract Builder<T, K, V> setValidateSource(boolean validate);
+    public abstract HDFSFileSource<T, K, V> build();
+  }
+
+  public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration configuration) {
+    return this.toBuilder().setConfiguration(configuration).build();
+  }
+
+  public HDFSFileSource<T, K, V> withUsername(@Nullable String username) {
+    return this.toBuilder().setUsername(username).build();
+  }
+
+  // =======================================================================
+  // BoundedSource
+  // =======================================================================
+
+  @Override
+  public List<? extends BoundedSource<T>> split(
+      final long desiredBundleSizeBytes,
+      PipelineOptions options) throws Exception {
+    if (serializableSplit() == null) {
+      List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs(
+          new PrivilegedExceptionAction<List<InputSplit>>() {
+            @Override
+            public List<InputSplit> run() throws Exception {
+              return computeSplits(desiredBundleSizeBytes, serializableConfiguration());
+            }
+          });
+      return Lists.transform(inputSplits,
+          new Function<InputSplit, BoundedSource<T>>() {
+            @Override
+            public BoundedSource<T> apply(@Nullable InputSplit inputSplit) {
+              SerializableSplit serializableSplit = new SerializableSplit(inputSplit);
+              return HDFSFileSource.this.toBuilder()
+                  .setSerializableSplit(serializableSplit)
+                  .build();
+            }
+          });
+    } else {
+      return ImmutableList.of(this);
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    long size = 0;
+
+    try {
+      // If this source represents a split from split,
+      // then return the size of the split, rather then the entire input
+      if (serializableSplit() != null) {
+        return serializableSplit().getSplit().getLength();
+      }
+
+      size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Long>() {
+        @Override
+        public Long run() throws Exception {
+          long size = 0;
+          Job job = SerializableConfiguration.newJob(serializableConfiguration());
+          for (FileStatus st : listStatus(createFormat(job), job)) {
+            size += st.getLen();
+          }
+          return size;
+        }
+      });
+    } catch (IOException e) {
+      LOG.warn(
+          "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
+      // ignore, and return 0
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn(
+          "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
+      // ignore, and return 0
+    }
+    return size;
+  }
+
+  @Override
+  public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
+    this.validate();
+    return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit());
+  }
+
+  @Override
+  public void validate() {
+    if (validateSource()) {
+      try {
+        UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                final Path pathPattern = new Path(filepattern());
+                FileSystem fs = FileSystem.get(pathPattern.toUri(),
+                    SerializableConfiguration.newConfiguration(serializableConfiguration()));
+                FileStatus[] fileStatuses = fs.globStatus(pathPattern);
+                checkState(
+                    fileStatuses != null && fileStatuses.length > 0,
+                    "Unable to find any files matching %s", filepattern());
+                  return null;
+                }
+              });
+      } catch (IOException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    return coder();
+  }
+
+  // =======================================================================
+  // Helpers
+  // =======================================================================
+
+  private List<InputSplit> computeSplits(long desiredBundleSizeBytes,
+                                         SerializableConfiguration serializableConfiguration)
+      throws IOException, IllegalAccessException, InstantiationException {
+    Job job = SerializableConfiguration.newJob(serializableConfiguration);
+    FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
+    FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
+    return createFormat(job).getSplits(job);
+  }
+
+  private FileInputFormat<K, V> createFormat(Job job)
+      throws IOException, IllegalAccessException, InstantiationException {
+    Path path = new Path(filepattern());
+    FileInputFormat.addInputPath(job, path);
+    return formatClass().newInstance();
+  }
+
+  private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job)
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    // FileInputFormat#listStatus is protected, so call using reflection
+    Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
+    listStatus.setAccessible(true);
+    @SuppressWarnings("unchecked")
+    List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job);
+    return stat;
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> Coder<T> getDefaultCoder(Class<T> c) {
+    if (Writable.class.isAssignableFrom(c)) {
+      Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
+      return (Coder<T>) WritableCoder.of(writableClass);
+    } else if (Void.class.equals(c)) {
+      return (Coder<T>) VoidCoder.of();
+    }
+    // TODO: how to use registered coders here?
+    throw new IllegalStateException("Cannot find coder for " + c);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> Class<T> castClass(Class<?> aClass) {
+    return (Class<T>) aClass;
+  }
+
+  // =======================================================================
+  // BoundedReader
+  // =======================================================================
+
+  private static class HDFSFileReader<T, K, V> extends BoundedSource.BoundedReader<T> {
+
+    private final HDFSFileSource<T, K, V> source;
+    private final String filepattern;
+    private final Class<? extends FileInputFormat<K, V>> formatClass;
+    private final Job job;
+
+    private List<InputSplit> splits;
+    private ListIterator<InputSplit> splitsIterator;
+
+    private Configuration conf;
+    private FileInputFormat<?, ?> format;
+    private TaskAttemptContext attemptContext;
+    private RecordReader<K, V> currentReader;
+    private KV<K, V> currentPair;
+
+    HDFSFileReader(
+        HDFSFileSource<T, K, V> source,
+        String filepattern,
+        Class<? extends FileInputFormat<K, V>> formatClass,
+        SerializableSplit serializableSplit)
+        throws IOException {
+      this.source = source;
+      this.filepattern = filepattern;
+      this.formatClass = formatClass;
+      this.job = SerializableConfiguration.newJob(source.serializableConfiguration());
+
+      if (serializableSplit != null) {
+        this.splits = ImmutableList.of(serializableSplit.getSplit());
+        this.splitsIterator = splits.listIterator();
+      }
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      Path path = new Path(filepattern);
+      FileInputFormat.addInputPath(job, path);
+
+      conf = job.getConfiguration();
+      try {
+        format = formatClass.newInstance();
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new IOException("Cannot instantiate file input format " + formatClass, e);
+      }
+      attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+
+      if (splitsIterator == null) {
+        splits = format.getSplits(job);
+        splitsIterator = splits.listIterator();
+      }
+
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      try {
+        if (currentReader != null && currentReader.nextKeyValue()) {
+          currentPair = nextPair();
+          return true;
+        } else {
+          while (splitsIterator.hasNext()) {
+            // advance the reader and see if it has records
+            final InputSplit nextSplit = splitsIterator.next();
+            @SuppressWarnings("unchecked")
+            RecordReader<K, V> reader =
+                (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
+            if (currentReader != null) {
+              currentReader.close();
+            }
+            currentReader = reader;
+            UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                currentReader.initialize(nextSplit, attemptContext);
+                return null;
+              }
+            });
+            if (currentReader.nextKeyValue()) {
+              currentPair = nextPair();
+              return true;
+            }
+            currentReader.close();
+            currentReader = null;
+          }
+          // either no next split or all readers were empty
+          currentPair = null;
+          return false;
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      if (currentPair == null) {
+        throw new NoSuchElementException();
+      }
+      return source.inputConverter().apply(currentPair);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (currentReader != null) {
+        currentReader.close();
+        currentReader = null;
+      }
+      currentPair = null;
+    }
+
+    @Override
+    public BoundedSource<T> getCurrentSource() {
+      return source;
+    }
+
+    @SuppressWarnings("unchecked")
+    private KV<K, V> nextPair() throws IOException, InterruptedException {
+      K key = currentReader.getCurrentKey();
+      V value = currentReader.getCurrentValue();
+      // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
+      if (key instanceof Writable) {
+        key = (K) WritableUtils.clone((Writable) key, conf);
+      }
+      if (value instanceof Writable) {
+        value = (V) WritableUtils.clone((Writable) value, conf);
+      }
+      return KV.of(key, value);
+    }
+
+    // =======================================================================
+    // Optional overrides
+    // =======================================================================
+
+    @Override
+    public Double getFractionConsumed() {
+      if (currentReader == null) {
+        return 0.0;
+      }
+      if (splits.isEmpty()) {
+        return 1.0;
+      }
+      int index = splitsIterator.previousIndex();
+      int numReaders = splits.size();
+      if (index == numReaders) {
+        return 1.0;
+      }
+      double before = 1.0 * index / numReaders;
+      double after = 1.0 * (index + 1) / numReaders;
+      Double fractionOfCurrentReader = getProgress();
+      if (fractionOfCurrentReader == null) {
+        return before;
+      }
+      return before + fractionOfCurrentReader * (after - before);
+    }
+
+    private Double getProgress() {
+      try {
+        return (double) currentReader.getProgress();
+      } catch (IOException | InterruptedException e) {
+        return null;
+      }
+    }
+
+  }
+
+  // =======================================================================
+  // SerializableSplit
+  // =======================================================================
+
+  /**
+   * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
+   * serialized using Java's standard serialization mechanisms. Note that the InputSplit
+   * has to be Writable (which most are).
+   */
+  protected static class SerializableSplit implements Externalizable {
+    private static final long serialVersionUID = 0L;
+
+    private InputSplit split;
+
+    public SerializableSplit() {
+    }
+
+    public SerializableSplit(InputSplit split) {
+      checkArgument(split instanceof Writable, "Split is not writable: %s", split);
+      this.split = split;
+    }
+
+    public InputSplit getSplit() {
+      return split;
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+      out.writeUTF(split.getClass().getCanonicalName());
+      ((Writable) split).write(out);
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+      String className = in.readUTF();
+      try {
+        split = (InputSplit) Class.forName(className).newInstance();
+        ((Writable) split).readFields(in);
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
new file mode 100644
index 0000000..154a818
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as
+ * Apache Beam {@link FileSystem FileSystems}.
+ *
+ * <p>The following HDFS FileSystem(s) are known to be unsupported:
+ * <ul>
+ *   <li>FTPFileSystem: Missing seek support within FTPInputStream</li>
+ * </ul>
+ *
+ * <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is seek
+ * efficient when reading. The source code for the following {@link FSInputStream} implementations
+ * (as of Hadoop 2.7.1) do provide seek implementations:
+ * <ul>
+ *   <li>HarFsInputStream</li>
+ *   <li>S3InputStream</li>
+ *   <li>DFSInputStream</li>
+ *   <li>SwiftNativeInputStream</li>
+ *   <li>NativeS3FsInputStream</li>
+ *   <li>LocalFSFileInputStream</li>
+ *   <li>NativeAzureFsInputStream</li>
+ *   <li>S3AInputStream</li>
+ * </ul>
+ */
+class HadoopFileSystem extends FileSystem<HadoopResourceId> {
+  @VisibleForTesting
+  final org.apache.hadoop.fs.FileSystem fileSystem;
+
+  HadoopFileSystem(Configuration configuration) throws IOException {
+    this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration);
+  }
+
+  @Override
+  protected List<MatchResult> match(List<String> specs) {
+    ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
+    for (String spec : specs) {
+      try {
+        FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec));
+        List<Metadata> metadata = new ArrayList<>();
+        for (FileStatus fileStatus : fileStatuses) {
+          if (fileStatus.isFile()) {
+            metadata.add(Metadata.builder()
+                .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri()))
+                .setIsReadSeekEfficient(true)
+                .setSizeBytes(fileStatus.getLen())
+                .build());
+          }
+        }
+        resultsBuilder.add(MatchResult.create(Status.OK, metadata));
+      } catch (IOException e) {
+        resultsBuilder.add(MatchResult.create(Status.ERROR, e));
+      }
+    }
+    return resultsBuilder.build();
+  }
+
+  @Override
+  protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)
+      throws IOException {
+    return Channels.newChannel(fileSystem.create(resourceId.toPath()));
+  }
+
+  @Override
+  protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException {
+    FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath());
+    return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath()));
+  }
+
+  @Override
+  protected void copy(
+      List<HadoopResourceId> srcResourceIds,
+      List<HadoopResourceId> destResourceIds) throws IOException {
+    for (int i = 0; i < srcResourceIds.size(); ++i) {
+      // Unfortunately HDFS FileSystems don't support a native copy operation so we are forced
+      // to use the inefficient implementation found in FileUtil which copies all the bytes through
+      // the local machine.
+      //
+      // HDFS FileSystem does define a concat method but could only find the DFSFileSystem
+      // implementing it. The DFSFileSystem implemented concat by deleting the srcs after which
+      // is not what we want. Also, all the other FileSystem implementations I saw threw
+      // UnsupportedOperationException within concat.
+      FileUtil.copy(
+          fileSystem, srcResourceIds.get(i).toPath(),
+          fileSystem, destResourceIds.get(i).toPath(),
+          false,
+          true,
+          fileSystem.getConf());
+    }
+  }
+
+  @Override
+  protected void rename(
+      List<HadoopResourceId> srcResourceIds,
+      List<HadoopResourceId> destResourceIds) throws IOException {
+    for (int i = 0; i < srcResourceIds.size(); ++i) {
+      fileSystem.rename(
+          srcResourceIds.get(i).toPath(),
+          destResourceIds.get(i).toPath());
+    }
+  }
+
+  @Override
+  protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
+    for (HadoopResourceId resourceId : resourceIds) {
+      fileSystem.delete(resourceId.toPath(), false);
+    }
+  }
+
+  @Override
+  protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+    try {
+      if (singleResourceSpec.endsWith("/") && !isDirectory) {
+        throw new IllegalArgumentException(String.format(
+            "Expected file path but received directory path %s", singleResourceSpec));
+      }
+      return !singleResourceSpec.endsWith("/") && isDirectory
+          ? new HadoopResourceId(new URI(singleResourceSpec + "/"))
+          : new HadoopResourceId(new URI(singleResourceSpec));
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(
+          String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory),
+          e);
+    }
+  }
+
+  @Override
+  protected String getScheme() {
+    return fileSystem.getScheme();
+  }
+
+  /** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}. */
+  private static class HadoopSeekableByteChannel implements SeekableByteChannel {
+    private final FileStatus fileStatus;
+    private final FSDataInputStream inputStream;
+    private boolean closed;
+
+    private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream) {
+      this.fileStatus = fileStatus;
+      this.inputStream = inputStream;
+      this.closed = false;
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+      if (closed) {
+        throw new IOException("Channel is closed");
+      }
+      return inputStream.read(dst);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long position() throws IOException {
+      if (closed) {
+        throw new IOException("Channel is closed");
+      }
+      return inputStream.getPos();
+    }
+
+    @Override
+    public SeekableByteChannel position(long newPosition) throws IOException {
+      if (closed) {
+        throw new IOException("Channel is closed");
+      }
+      inputStream.seek(newPosition);
+      return this;
+    }
+
+    @Override
+    public long size() throws IOException {
+      if (closed) {
+        throw new IOException("Channel is closed");
+      }
+      return fileStatus.getLen();
+    }
+
+    @Override
+    public SeekableByteChannel truncate(long size) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isOpen() {
+      return !closed;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      inputStream.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
new file mode 100644
index 0000000..2cb9d8a
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer}
+ * for a Hadoop {@link Configuration}. The serialized representation is that of a JSON map.
+ *
+ * <p>Note that the serialization of the Hadoop {@link Configuration} only keeps the keys and their
+ * values dropping any configuration hierarchy and source information.
+ */
+@AutoService(Module.class)
+public class HadoopFileSystemModule extends SimpleModule {
+  public HadoopFileSystemModule() {
+    super("HadoopFileSystemModule");
+    setMixInAnnotation(Configuration.class, ConfigurationMixin.class);
+  }
+
+  /** A mixin class to add Jackson annotations to the Hadoop {@link Configuration} class. */
+  @JsonDeserialize(using = ConfigurationDeserializer.class)
+  @JsonSerialize(using = ConfigurationSerializer.class)
+  private static class ConfigurationMixin {}
+
+  /** A Jackson {@link JsonDeserializer} for Hadoop {@link Configuration} objects. */
+  static class ConfigurationDeserializer extends JsonDeserializer<Configuration> {
+    @Override
+    public Configuration deserialize(JsonParser jsonParser,
+        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+      Map<String, String> rawConfiguration =
+          jsonParser.readValueAs(new TypeReference<Map<String, String>>() {});
+      Configuration configuration = new Configuration(false);
+      for (Map.Entry<String, String> entry : rawConfiguration.entrySet()) {
+        configuration.set(entry.getKey(), entry.getValue());
+      }
+      return configuration;
+    }
+  }
+
+  /** A Jackson {@link JsonSerializer} for Hadoop {@link Configuration} objects. */
+  static class ConfigurationSerializer extends JsonSerializer<Configuration> {
+    @Override
+    public void serialize(Configuration configuration, JsonGenerator jsonGenerator,
+        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+      Map<String, String> map = new TreeMap<>();
+      for (Map.Entry<String, String> entry : configuration) {
+        map.put(entry.getKey(), entry.getValue());
+      }
+      jsonGenerator.writeObject(map);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
new file mode 100644
index 0000000..31250bc
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.util.List;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
+ * for the {@link HadoopFileSystem}.
+ */
+public interface HadoopFileSystemOptions extends PipelineOptions {
+  @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
+      + "To specify on the command-line, represent the value as a JSON list of JSON maps, where "
+      + "each map represents the entire configuration for a single Hadoop filesystem. For example "
+      + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...},"
+      + "{\"fs.default.name\": \"s3a://\", ...},...]'")
+  @Default.InstanceFactory(ConfigurationLocator.class)
+  List<Configuration> getHdfsConfiguration();
+  void setHdfsConfiguration(List<Configuration> value);
+
+  /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */
+  class ConfigurationLocator implements DefaultValueFactory<Configuration> {
+    @Override
+    public Configuration create(PipelineOptions options) {
+      // TODO: Find default configuration to use
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
new file mode 100644
index 0000000..344623b
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * {@link AutoService} registrar for {@link HadoopFileSystemOptions}.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class HadoopFileSystemOptionsRegistrar implements PipelineOptionsRegistrar {
+
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.<Class<? extends PipelineOptions>>of(HadoopFileSystemOptions.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
new file mode 100644
index 0000000..9159df3
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * {@link AutoService} registrar for the {@link HadoopFileSystem}.
+ */
+@AutoService(FileSystemRegistrar.class)
+public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
+
+  @Override
+  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+    List<Configuration> configurations =
+        options.as(HadoopFileSystemOptions.class).getHdfsConfiguration();
+    if (configurations == null) {
+      configurations = Collections.emptyList();
+    }
+    checkArgument(configurations.size() <= 1,
+        String.format(
+            "The %s currently only supports at most a single Hadoop configuration.",
+            HadoopFileSystemRegistrar.class.getSimpleName()));
+
+    ImmutableList.Builder<FileSystem> builder = ImmutableList.builder();
+    for (Configuration configuration : configurations) {
+      try {
+        builder.add(new HadoopFileSystem(configuration));
+      } catch (IOException e) {
+        throw new IllegalArgumentException(String.format(
+            "Failed to construct Hadoop filesystem with configuration %s", configuration), e);
+      }
+    }
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
new file mode 100644
index 0000000..e570864
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.net.URI;
+import java.util.Objects;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link ResourceId} implementation for the {@link HadoopFileSystem}.
+ */
+class HadoopResourceId implements ResourceId {
+  private final URI uri;
+
+  HadoopResourceId(URI uri) {
+    this.uri = uri;
+  }
+
+  @Override
+  public ResourceId resolve(String other, ResolveOptions resolveOptions) {
+    return new HadoopResourceId(uri.resolve(other));
+  }
+
+  @Override
+  public ResourceId getCurrentDirectory() {
+    return new HadoopResourceId(uri.getPath().endsWith("/") ? uri : uri.resolve("."));
+  }
+
+  public boolean isDirectory() {
+    return uri.getPath().endsWith("/");
+  }
+
+  @Override
+  public String getFilename() {
+    return new Path(uri).getName();
+  }
+
+  @Override
+  public String getScheme() {
+    return uri.getScheme();
+  }
+
+  @Override
+  public String toString() {
+    return uri.toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof HadoopResourceId)) {
+      return false;
+    }
+    return Objects.equals(uri, ((HadoopResourceId) obj).uri);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(uri);
+  }
+
+  Path toPath() {
+    return new Path(uri);
+  }
+}