You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:51 UTC

[45/50] [abbrv] incubator-beam git commit: Several improvements to HDFS/Hadoop interoperability

Several improvements to HDFS/Hadoop interoperability

* handle NullWritable in WritableCoder
* update Function handling in HDFSFileSource#splitIntoBundles
* add AvroHDFSFileSource
* add HDFSFileSink
* add SimpleAuth HDFS IO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a17a8b2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a17a8b2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a17a8b2e

Branch: refs/heads/runners-spark2
Commit: a17a8b2e1a2279b58f32ab32b6b33522c7c4a65d
Parents: b09de0f
Author: Neville Li <ne...@spotify.com>
Authored: Mon Jun 27 15:51:17 2016 -0400
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:53 2016 -0700

----------------------------------------------------------------------
 sdks/java/io/hdfs/pom.xml                       |  24 ++
 .../beam/sdk/io/hdfs/AvroHDFSFileSource.java    | 145 ++++++++++
 .../beam/sdk/io/hdfs/AvroWrapperCoder.java      | 116 ++++++++
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   | 277 +++++++++++++++++++
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java |  40 +--
 .../apache/beam/sdk/io/hdfs/WritableCoder.java  |   9 +-
 .../SimpleAuthAvroHDFSFileSource.java           |  84 ++++++
 .../hdfs/simpleauth/SimpleAuthHDFSFileSink.java | 132 +++++++++
 .../simpleauth/SimpleAuthHDFSFileSource.java    | 122 ++++++++
 .../beam/sdk/io/hdfs/AvroWrapperCoderTest.java  |  52 ++++
 .../beam/sdk/io/hdfs/WritableCoderTest.java     |   9 +
 11 files changed, 989 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 2e427b1..42175d5 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -82,6 +82,30 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <version>${avro.version}</version>
+      <classifier>hadoop2</classifier>
+      <exclusions>
+        <!-- exclude old Jetty version of servlet API -->
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>2.7.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
new file mode 100644
index 0000000..9dc926b
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * A {@code BoundedSource} for reading Avro files resident in a Hadoop filesystem.
+ *
+ * @param <T> The type of the Avro records to be read from the source.
+ */
+public class AvroHDFSFileSource<T> extends HDFSFileSource<AvroKey<T>, NullWritable> {
+  private static final long serialVersionUID = 0L;
+
+  protected final AvroCoder<T> avroCoder;
+  private final String schemaStr;
+
+  public AvroHDFSFileSource(String filepattern, AvroCoder<T> avroCoder) {
+    this(filepattern, avroCoder, null);
+  }
+
+  public AvroHDFSFileSource(String filepattern,
+                            AvroCoder<T> avroCoder,
+                            HDFSFileSource.SerializableSplit serializableSplit) {
+    super(filepattern,
+        ClassUtil.<AvroKeyInputFormat<T>>castClass(AvroKeyInputFormat.class),
+        ClassUtil.<AvroKey<T>>castClass(AvroKey.class),
+        NullWritable.class, serializableSplit);
+    this.avroCoder = avroCoder;
+    this.schemaStr = avroCoder.getSchema().toString();
+  }
+
+  @Override
+  public Coder<KV<AvroKey<T>, NullWritable>> getDefaultOutputCoder() {
+    AvroWrapperCoder<AvroKey<T>, T> keyCoder = AvroWrapperCoder.of(this.getKeyClass(), avroCoder);
+    return KvCoder.of(keyCoder, WritableCoder.of(NullWritable.class));
+  }
+
+  @Override
+  public List<? extends AvroHDFSFileSource<T>> splitIntoBundles(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+    if (serializableSplit == null) {
+      return Lists.transform(computeSplits(desiredBundleSizeBytes),
+          new Function<InputSplit, AvroHDFSFileSource<T>>() {
+            @Override
+            public AvroHDFSFileSource<T> apply(@Nullable InputSplit inputSplit) {
+              return new AvroHDFSFileSource<>(
+                  filepattern, avroCoder, new SerializableSplit(inputSplit));
+            }
+          });
+    } else {
+      return ImmutableList.of(this);
+    }
+  }
+
+  @Override
+  public BoundedReader<KV<AvroKey<T>, NullWritable>> createReader(PipelineOptions options)
+      throws IOException {
+    this.validate();
+
+    Schema schema = new Schema.Parser().parse(schemaStr);
+    if (serializableSplit == null) {
+      return new AvroHDFSFileReader<>(this, filepattern, formatClass, schema);
+    } else {
+      return new AvroHDFSFileReader<>(this, filepattern, formatClass, schema,
+          serializableSplit.getSplit());
+    }
+  }
+
+  static class AvroHDFSFileReader<T> extends HDFSFileReader<AvroKey<T>, NullWritable> {
+    public AvroHDFSFileReader(BoundedSource<KV<AvroKey<T>, NullWritable>> source,
+                              String filepattern,
+                              Class<? extends FileInputFormat<?, ?>> formatClass,
+                              Schema schema) throws IOException {
+      this(source, filepattern, formatClass, schema, null);
+    }
+
+    public AvroHDFSFileReader(BoundedSource<KV<AvroKey<T>, NullWritable>> source,
+                              String filepattern,
+                              Class<? extends FileInputFormat<?, ?>> formatClass,
+                              Schema schema, InputSplit split) throws IOException {
+      super(source, filepattern, formatClass, split);
+      AvroJob.setInputKeySchema(job, schema);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected KV<AvroKey<T>, NullWritable> nextPair() throws IOException, InterruptedException {
+      AvroKey<T> key = currentReader.getCurrentKey();
+      NullWritable value = currentReader.getCurrentValue();
+
+      // clone the record to work around identical element issue due to object reuse
+      Coder<T> avroCoder = ((AvroHDFSFileSource<T>) this.getCurrentSource()).avroCoder;
+      key = new AvroKey(CoderUtils.clone(avroCoder, key.datum()));
+
+      return KV.of(key, value);
+    }
+
+  }
+
+  static class ClassUtil {
+    @SuppressWarnings("unchecked")
+    static <T> Class<T> castClass(Class<?> aClass) {
+      return (Class<T>) aClass;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
new file mode 100644
index 0000000..a831afe
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.PropertyNames;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.avro.mapred.AvroWrapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A {@code AvroWrapperCoder} is a {@link Coder} for a Java class that implements {@link
+ * AvroWrapper}.
+ *
+ * @param <WrapperT> the type of the wrapper
+ * @param <DatumT> the type of the datum
+ */
+public class AvroWrapperCoder<WrapperT extends AvroWrapper<DatumT>, DatumT>
+    extends StandardCoder<WrapperT> {
+  private static final long serialVersionUID = 0L;
+
+  private final Class<WrapperT> wrapperType;
+  private final AvroCoder<DatumT> datumCoder;
+
+  private AvroWrapperCoder(Class<WrapperT> wrapperType, AvroCoder<DatumT> datumCoder) {
+    this.wrapperType = wrapperType;
+    this.datumCoder = datumCoder;
+  }
+
+  /**
+   * Return a {@code AvroWrapperCoder} instance for the provided element class.
+   * @param <WrapperT> the type of the wrapper
+   * @param <DatumT> the type of the datum
+   */
+  public static <WrapperT extends AvroWrapper<DatumT>, DatumT>
+  AvroWrapperCoder<WrapperT, DatumT>of(Class<WrapperT> wrapperType, AvroCoder<DatumT> datumCoder) {
+    return new AvroWrapperCoder<>(wrapperType, datumCoder);
+  }
+
+  @JsonCreator
+  @SuppressWarnings("unchecked")
+  public static AvroWrapperCoder<?, ?> of(
+      @JsonProperty("wrapperType") String wrapperType,
+      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components)
+      throws ClassNotFoundException {
+    Class<?> clazz = Class.forName(wrapperType);
+    if (!AvroWrapper.class.isAssignableFrom(clazz)) {
+      throw new ClassNotFoundException(
+          "Class " + wrapperType + " does not implement AvroWrapper");
+    }
+    checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size());
+    return of((Class<? extends AvroWrapper>) clazz, (AvroCoder<?>) components.get(0));
+  }
+
+  @Override
+  public void encode(WrapperT value, OutputStream outStream, Context context) throws IOException {
+    datumCoder.encode(value.datum(), outStream, context);
+  }
+
+  @Override
+  public WrapperT decode(InputStream inStream, Context context) throws IOException {
+    try {
+      WrapperT wrapper = wrapperType.newInstance();
+      wrapper.datum(datumCoder.decode(inStream, context));
+      return wrapper;
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new CoderException("unable to deserialize record", e);
+    }
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Collections.singletonList(datumCoder);
+  }
+
+  @Override
+  public CloudObject asCloudObject() {
+    CloudObject result = super.asCloudObject();
+    result.put("wrapperType", wrapperType.getName());
+    return result;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    datumCoder.verifyDeterministic();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
new file mode 100644
index 0000000..688447a
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.api.client.util.Maps;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * A {@code Sink} for writing records to a Hadoop filesystem using a Hadoop file-based output
+ * format.
+ *
+ * @param <K> The type of keys to be written to the sink.
+ * @param <V> The type of values to be written to the sink.
+ */
+public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
+
+  private static final JobID jobId = new JobID(
+      Long.toString(System.currentTimeMillis()),
+      new Random().nextInt(Integer.MAX_VALUE));
+
+  protected final String path;
+  protected final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+  // workaround to make Configuration serializable
+  private final Map<String, String> map;
+
+  public HDFSFileSink(String path, Class<? extends FileOutputFormat<K, V>> formatClass) {
+    this.path = path;
+    this.formatClass = formatClass;
+    this.map = Maps.newHashMap();
+  }
+
+  public HDFSFileSink(String path, Class<? extends FileOutputFormat<K, V>> formatClass,
+                      Configuration conf) {
+    this(path, formatClass);
+    // serialize conf to map
+    for (Map.Entry<String, String> entry : conf) {
+      map.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void validate(PipelineOptions options) {
+    try {
+      Job job = jobInstance();
+      FileSystem fs = FileSystem.get(job.getConfiguration());
+      checkState(!fs.exists(new Path(path)), "Output path " + path + " already exists");
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Sink.WriteOperation<KV<K, V>, ?> createWriteOperation(PipelineOptions options) {
+    return new HDFSWriteOperation<>(this, path, formatClass);
+  }
+
+  private Job jobInstance() throws IOException {
+    Job job = Job.getInstance();
+    // deserialize map to conf
+    Configuration conf = job.getConfiguration();
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
+    job.setJobID(jobId);
+    return job;
+  }
+
+  // =======================================================================
+  // WriteOperation
+  // =======================================================================
+
+  /** {{@link WriteOperation}} for HDFS. */
+  public static class HDFSWriteOperation<K, V> extends WriteOperation<KV<K, V>, String> {
+
+    private final Sink<KV<K, V>> sink;
+    protected final String path;
+    protected final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+    public HDFSWriteOperation(Sink<KV<K, V>> sink,
+                              String path,
+                              Class<? extends FileOutputFormat<K, V>> formatClass) {
+      this.sink = sink;
+      this.path = path;
+      this.formatClass = formatClass;
+    }
+
+    @Override
+    public void initialize(PipelineOptions options) throws Exception {
+      Job job = ((HDFSFileSink<K, V>) getSink()).jobInstance();
+      FileOutputFormat.setOutputPath(job, new Path(path));
+    }
+
+    @Override
+    public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
+      Job job = ((HDFSFileSink<K, V>) getSink()).jobInstance();
+      FileSystem fs = FileSystem.get(job.getConfiguration());
+
+      // If there are 0 output shards, just create output folder.
+      if (!writerResults.iterator().hasNext()) {
+        fs.mkdirs(new Path(path));
+        return;
+      }
+
+      // job successful
+      JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+      FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context);
+      outputCommitter.commitJob(context);
+
+      // get actual output shards
+      Set<String> actual = Sets.newHashSet();
+      FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          String name = path.getName();
+          return !name.startsWith("_") && !name.startsWith(".");
+        }
+      });
+
+      // get expected output shards
+      Set<String> expected = Sets.newHashSet(writerResults);
+      checkState(
+          expected.size() == Lists.newArrayList(writerResults).size(),
+          "Data loss due to writer results hash collision");
+      for (FileStatus s : statuses) {
+        String name = s.getPath().getName();
+        int pos = name.indexOf('.');
+        actual.add(pos > 0 ? name.substring(0, pos) : name);
+      }
+
+      checkState(actual.equals(expected), "Writer results and output files do not match");
+
+      // rename output shards to Hadoop style, i.e. part-r-00000.txt
+      int i = 0;
+      for (FileStatus s : statuses) {
+        String name = s.getPath().getName();
+        int pos = name.indexOf('.');
+        String ext = pos > 0 ? name.substring(pos) : "";
+        fs.rename(
+            s.getPath(),
+            new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext)));
+        i++;
+      }
+    }
+
+    @Override
+    public Writer<KV<K, V>, String> createWriter(PipelineOptions options) throws Exception {
+      return new HDFSWriter<>(this, path, formatClass);
+    }
+
+    @Override
+    public Sink<KV<K, V>> getSink() {
+      return sink;
+    }
+
+    @Override
+    public Coder<String> getWriterResultCoder() {
+      return StringUtf8Coder.of();
+    }
+
+  }
+
+  // =======================================================================
+  // Writer
+  // =======================================================================
+
+  /** {{@link Writer}} for HDFS files. */
+  public static class HDFSWriter<K, V> extends Writer<KV<K, V>, String> {
+
+    private final HDFSWriteOperation<K, V> writeOperation;
+    private final String path;
+    private final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+    // unique hash for each task
+    private int hash;
+
+    private TaskAttemptContext context;
+    private RecordWriter<K, V> recordWriter;
+    private FileOutputCommitter outputCommitter;
+
+    public HDFSWriter(HDFSWriteOperation<K, V> writeOperation,
+                      String path,
+                      Class<? extends FileOutputFormat<K, V>> formatClass) {
+      this.writeOperation = writeOperation;
+      this.path = path;
+      this.formatClass = formatClass;
+    }
+
+    @Override
+    public void open(String uId) throws Exception {
+      this.hash = uId.hashCode();
+
+      Job job = ((HDFSFileSink<K, V>) getWriteOperation().getSink()).jobInstance();
+      FileOutputFormat.setOutputPath(job, new Path(path));
+
+      // Each Writer is responsible for writing one bundle of elements and is represented by one
+      // unique Hadoop task based on uId/hash. All tasks share the same job ID. Since Dataflow
+      // handles retrying of failed bundles, each task has one attempt only.
+      JobID jobId = job.getJobID();
+      TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
+      context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
+
+      FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
+      recordWriter = outputFormat.getRecordWriter(context);
+      outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
+    }
+
+    @Override
+    public void write(KV<K, V> value) throws Exception {
+      recordWriter.write(value.getKey(), value.getValue());
+    }
+
+    @Override
+    public String close() throws Exception {
+      // task/attempt successful
+      recordWriter.close(context);
+      outputCommitter.commitTask(context);
+
+      // result is prefix of the output file name
+      return String.format("part-r-%d", hash);
+    }
+
+    @Override
+    public WriteOperation<KV<K, V>, String> getWriteOperation() {
+      return writeOperation;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index 7a0545d..de68565 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -100,11 +100,11 @@ import javax.annotation.Nullable;
 public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
   private static final long serialVersionUID = 0L;
 
-  private final String filepattern;
-  private final Class<? extends FileInputFormat<?, ?>> formatClass;
-  private final Class<K> keyClass;
-  private final Class<V> valueClass;
-  private final SerializableSplit serializableSplit;
+  protected final String filepattern;
+  protected final Class<? extends FileInputFormat<?, ?>> formatClass;
+  protected final Class<K> keyClass;
+  protected final Class<V> valueClass;
+  protected final SerializableSplit serializableSplit;
 
   /**
    * Creates a {@code Read} transform that will read from an {@code HDFSFileSource}
@@ -133,9 +133,9 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
   /**
    * Create a {@code HDFSFileSource} based on a file or a file pattern specification.
    */
-  private HDFSFileSource(String filepattern,
-                         Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
-                         Class<V> valueClass) {
+  protected HDFSFileSource(String filepattern,
+                           Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
+                           Class<V> valueClass) {
     this(filepattern, formatClass, keyClass, valueClass, null);
   }
 
@@ -143,9 +143,9 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
    * Create a {@code HDFSFileSource} based on a single Hadoop input split, which won't be
    * split up further.
    */
-  private HDFSFileSource(String filepattern,
-                         Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
-                         Class<V> valueClass, SerializableSplit serializableSplit) {
+  protected HDFSFileSource(String filepattern,
+                           Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
+                           Class<V> valueClass, SerializableSplit serializableSplit) {
     this.filepattern = filepattern;
     this.formatClass = formatClass;
     this.keyClass = keyClass;
@@ -183,9 +183,9 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
     if (serializableSplit == null) {
       return Lists.transform(computeSplits(desiredBundleSizeBytes),
           new Function<InputSplit, BoundedSource<KV<K, V>>>() {
-        @Nullable @Override
+        @Override
         public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
-          return new HDFSFileSource<K, V>(filepattern, formatClass, keyClass,
+          return new HDFSFileSource<>(filepattern, formatClass, keyClass,
               valueClass, new SerializableSplit(inputSplit));
         }
       });
@@ -201,7 +201,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
     return formatClass.newInstance();
   }
 
-  private List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException,
+  protected List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException,
       IllegalAccessException, InstantiationException {
     Job job = Job.getInstance();
     FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
@@ -276,13 +276,14 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
     private final BoundedSource<KV<K, V>> source;
     private final String filepattern;
     private final Class formatClass;
+    protected Job job;
 
     private FileInputFormat<?, ?> format;
     private TaskAttemptContext attemptContext;
     private List<InputSplit> splits;
     private ListIterator<InputSplit> splitsIterator;
     private Configuration conf;
-    private RecordReader<K, V> currentReader;
+    protected RecordReader<K, V> currentReader;
     private KV<K, V> currentPair;
     private volatile boolean done = false;
 
@@ -290,7 +291,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
      * Create a {@code HDFSFileReader} based on a file or a file pattern specification.
      */
     public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
-                          Class<? extends FileInputFormat<?, ?>> formatClass) {
+                          Class<? extends FileInputFormat<?, ?>> formatClass) throws IOException {
       this(source, filepattern, formatClass, null);
     }
 
@@ -298,7 +299,8 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
      * Create a {@code HDFSFileReader} based on a single Hadoop input split.
      */
     public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
-                          Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split) {
+                          Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split)
+            throws IOException {
       this.source = source;
       this.filepattern = filepattern;
       this.formatClass = formatClass;
@@ -306,11 +308,11 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
         this.splits = ImmutableList.of(split);
         this.splitsIterator = splits.listIterator();
       }
+      this.job = Job.getInstance(); // new instance
     }
 
     @Override
     public boolean start() throws IOException {
-      Job job = Job.getInstance(); // new instance
       Path path = new Path(filepattern);
       FileInputFormat.addInputPath(job, path);
 
@@ -369,7 +371,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
     }
 
     @SuppressWarnings("unchecked")
-    private KV<K, V> nextPair() throws IOException, InterruptedException {
+    protected KV<K, V> nextPair() throws IOException, InterruptedException {
       K key = currentReader.getCurrentKey();
       V value = currentReader.getCurrentValue();
       // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
index 814a762..4e913ed 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.util.CloudObject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -32,8 +33,7 @@ import java.io.OutputStream;
 import java.util.List;
 
 /**
- * A {@code WritableCoder} is a {@link org.apache.beam.sdk.coders.Coder} for a
- * Java class that implements {@link org.apache.hadoop.io.Writable}.
+ * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
  *
  * <p> To use, specify the coder type on a PCollection:
  * <pre>
@@ -79,9 +79,14 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> {
     value.write(new DataOutputStream(outStream));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public T decode(InputStream inStream, Context context) throws IOException {
     try {
+      if (type == NullWritable.class) {
+        // NullWritable has no default constructor
+        return (T) NullWritable.get();
+      }
       T t = type.newInstance();
       t.readFields(new DataInputStream(inStream));
       return t;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
new file mode 100644
index 0000000..5dd9673
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs.simpleauth;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.io.hdfs.AvroHDFSFileSource;
+import org.apache.beam.sdk.io.hdfs.HDFSFileSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Source for Avros on Hadoop/HDFS with Simple Authentication.
+ *
+ * Allows to set arbitrary username as HDFS user, which is used for reading Avro from HDFS.
+ */
+public class SimpleAuthAvroHDFSFileSource<T> extends AvroHDFSFileSource<T> {
+  // keep this field to pass Hadoop user between workers
+  private final String username;
+
+  /**
+   * Create a {@code SimpleAuthAvroHDFSFileSource} based on a file or a file pattern specification.
+   * @param username HDFS username.
+   */
+  public SimpleAuthAvroHDFSFileSource(String filepattern,
+                                      AvroCoder<T> avroCoder,
+                                      String username) {
+    super(filepattern, avroCoder);
+    this.username = username;
+  }
+
+  /**
+   * Create a {@code SimpleAuthAvroHDFSFileSource} based on a single Hadoop input split, which won't
+   * be split up further.
+   * @param username HDFS username.
+   */
+  public SimpleAuthAvroHDFSFileSource(String filepattern,
+                                      AvroCoder<T> avroCoder,
+                                      HDFSFileSource.SerializableSplit serializableSplit,
+                                      String username) {
+    super(filepattern, avroCoder, serializableSplit);
+    this.username = username;
+  }
+
+  @Override
+  public List<? extends AvroHDFSFileSource<T>> splitIntoBundles(long desiredBundleSizeBytes,
+                                                                PipelineOptions options)
+      throws Exception {
+    if (serializableSplit == null) {
+      return Lists.transform(computeSplits(desiredBundleSizeBytes),
+          new Function<InputSplit, AvroHDFSFileSource<T>>() {
+            @Override
+            public AvroHDFSFileSource<T> apply(@Nullable InputSplit inputSplit) {
+              return new SimpleAuthAvroHDFSFileSource<>(
+                  filepattern, avroCoder, new HDFSFileSource.SerializableSplit(inputSplit),
+                  username);
+            }
+          });
+    } else {
+      return ImmutableList.of(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
new file mode 100644
index 0000000..d0fd8b6
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs.simpleauth;
+
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.hdfs.HDFSFileSink;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A {@code Sink} for writing records to a Hadoop filesystem using a Hadoop file-based output
+ * format with Simple Authentication.
+ *
+ * Allows arbitrary username as HDFS user, which is used for writing to HDFS.
+ *
+ * @param <K> The type of keys to be written to the sink.
+ * @param <V> The type of values to be written to the sink.
+ */
+public class SimpleAuthHDFSFileSink<K, V> extends HDFSFileSink<K, V> {
+  private final String username;
+
+  public SimpleAuthHDFSFileSink(String path,
+                                Class<? extends FileOutputFormat<K, V>> formatClass,
+                                Configuration conf,
+                                String username) {
+    super(path, formatClass, conf);
+    this.username = username;
+  }
+
+  @Override
+  public WriteOperation<KV<K, V>, ?> createWriteOperation(PipelineOptions options) {
+    return new SimpleAuthHDFSWriteOperation<>(this, path, formatClass, username);
+  }
+
+  /** {{@link WriteOperation}} for HDFS with Simple Authentication. */
+  public static class SimpleAuthHDFSWriteOperation<K, V> extends HDFSWriteOperation<K, V> {
+    private final String username;
+
+    SimpleAuthHDFSWriteOperation(Sink<KV<K, V>> sink,
+                                 String path,
+                                 Class<? extends FileOutputFormat<K, V>> formatClass,
+                                 String username) {
+      super(sink, path, formatClass);
+      this.username = username;
+    }
+
+    @Override
+    public void finalize(final Iterable<String> writerResults, final PipelineOptions options)
+        throws Exception {
+      UserGroupInformation.createRemoteUser(username).doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          superFinalize(writerResults, options);
+          return null;
+        }
+      });
+    }
+
+    private void superFinalize(Iterable<String> writerResults, PipelineOptions options)
+        throws Exception {
+      super.finalize(writerResults, options);
+    }
+
+    @Override
+    public Writer<KV<K, V>, String> createWriter(PipelineOptions options) throws Exception {
+      return new SimpleAuthHDFSWriter<>(this, path, formatClass, username);
+    }
+  }
+
+  /** {{@link Writer}} for HDFS files with Simple Authentication. */
+  public static class SimpleAuthHDFSWriter<K, V> extends HDFSWriter<K, V> {
+    private final UserGroupInformation ugi;
+
+    public SimpleAuthHDFSWriter(SimpleAuthHDFSWriteOperation<K, V> writeOperation,
+                                String path,
+                                Class<? extends FileOutputFormat<K, V>> formatClass,
+                                String username) {
+      super(writeOperation, path, formatClass);
+      ugi = UserGroupInformation.createRemoteUser(username);
+    }
+
+    @Override
+    public void open(final String uId) throws Exception {
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          superOpen(uId);
+          return null;
+        }
+      });
+    }
+
+    private void superOpen(String uId) throws Exception {
+      super.open(uId);
+    }
+
+    @Override
+    public String close() throws Exception {
+      return ugi.doAs(new PrivilegedExceptionAction<String>() {
+        @Override
+        public String run() throws Exception {
+          return superClose();
+        }
+      });
+    }
+
+    private String superClose() throws Exception {
+      return super.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
new file mode 100644
index 0000000..5b768fc
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs.simpleauth;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.hdfs.HDFSFileSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Source for Hadoop/HDFS with Simple Authentication.
+ *
+ * Allows to set arbitrary username as HDFS user, which is used for reading from HDFS.
+ */
+public class SimpleAuthHDFSFileSource<K, V> extends HDFSFileSource<K, V> {
+  private final String username;
+  /**
+   * Create a {@code SimpleAuthHDFSFileSource} based on a single Hadoop input split, which won't be
+   * split up further.
+   * @param username HDFS username.
+   */
+  protected SimpleAuthHDFSFileSource(String filepattern,
+                                     Class<? extends FileInputFormat<?, ?>> formatClass,
+                                     Class<K> keyClass,
+                                     Class<V> valueClass,
+                                     HDFSFileSource.SerializableSplit serializableSplit,
+                                     String username) {
+    super(filepattern, formatClass, keyClass, valueClass, serializableSplit);
+    this.username = username;
+  }
+
+  /**
+   * Create a {@code SimpleAuthHDFSFileSource} based on a file or a file pattern specification.
+   * @param username HDFS username.
+   */
+  protected SimpleAuthHDFSFileSource(String filepattern,
+                                     Class<? extends FileInputFormat<?, ?>> formatClass,
+                                     Class<K> keyClass,
+                                     Class<V> valueClass,
+                                     String username) {
+    super(filepattern, formatClass, keyClass, valueClass);
+    this.username = username;
+  }
+
+  /**
+   * Creates a {@code Read} transform that will read from an {@code SimpleAuthHDFSFileSource}
+   * with the given file name or pattern ("glob") using the given Hadoop {@link FileInputFormat},
+   * with key-value types specified
+   * by the given key class and value class.
+   * @param username HDFS username.
+   */
+  public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K, V>> readFrom(
+      String filepattern,
+      Class<T> formatClass,
+      Class<K> keyClass,
+      Class<V> valueClass,
+      String username) {
+    return Read.from(from(filepattern, formatClass, keyClass, valueClass, username));
+  }
+
+  /**
+   * Creates a {@code SimpleAuthHDFSFileSource} that reads from the given file name or pattern
+   * ("glob") using the given Hadoop {@link FileInputFormat}, with key-value types specified by the
+   * given key class and value class.
+   * @param username HDFS username.
+   */
+  public static <K, V, T extends FileInputFormat<K, V>> HDFSFileSource<K, V> from(
+      String filepattern,
+      Class<T> formatClass,
+      Class<K> keyClass,
+      Class<V> valueClass,
+      String username) {
+    @SuppressWarnings("unchecked")
+    HDFSFileSource<K, V> source = (HDFSFileSource<K, V>)
+        new SimpleAuthHDFSFileSource(filepattern, formatClass, keyClass, valueClass, username);
+    return source;
+  }
+
+  @Override
+  public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+    if (serializableSplit == null) {
+      return Lists.transform(computeSplits(desiredBundleSizeBytes),
+          new Function<InputSplit, BoundedSource<KV<K, V>>>() {
+            @Nullable
+            @Override
+            public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
+              return new SimpleAuthHDFSFileSource<>(filepattern, formatClass, keyClass,
+                  valueClass, new HDFSFileSource.SerializableSplit(inputSplit),
+                  username);
+            }
+          });
+    } else {
+      return ImmutableList.of(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
new file mode 100644
index 0000000..85cbd46
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.junit.Test;
+
+/**
+ * Tests for AvroWrapperCoder.
+ */
+public class AvroWrapperCoderTest {
+
+  @Test
+  public void testAvroKeyEncoding() throws Exception {
+    AvroKey<Integer> value = new AvroKey<>(42);
+    AvroWrapperCoder<AvroKey<Integer>, Integer> coder = AvroWrapperCoder.of(
+        AvroHDFSFileSource.ClassUtil.<AvroKey<Integer>>castClass(AvroKey.class),
+        AvroCoder.of(Integer.class));
+
+    CoderProperties.coderDecodeEncodeEqual(coder, value);
+  }
+
+  @Test
+  public void testAvroValueEncoding() throws Exception {
+    AvroValue<Integer> value = new AvroValue<>(42);
+    AvroWrapperCoder<AvroValue<Integer>, Integer> coder = AvroWrapperCoder.of(
+        AvroHDFSFileSource.ClassUtil.<AvroValue<Integer>>castClass(AvroValue.class),
+        AvroCoder.of(Integer.class));
+
+    CoderProperties.coderDecodeEncodeEqual(coder, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a17a8b2e/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
index 715da8e..ac32c33 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.hdfs;
 import org.apache.beam.sdk.testing.CoderProperties;
 
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;
 
 /**
@@ -34,4 +35,12 @@ public class WritableCoderTest {
 
     CoderProperties.coderDecodeEncodeEqual(coder, value);
   }
+
+  @Test
+  public void testNullWritableEncoding() throws Exception {
+    NullWritable value = NullWritable.get();
+    WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
+
+    CoderProperties.coderDecodeEncodeEqual(coder, value);
+  }
 }