You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/05 16:57:22 UTC

[1/3] beam git commit: [BEAM-2016] Delete HdfsFileSource & Sink

Repository: beam
Updated Branches:
  refs/heads/master 3bffe0e00 -> 610bda168


http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
deleted file mode 100644
index 9fa6606..0000000
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.base.MoreObjects;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Tests for HDFSFileSinkTest.
- */
-public class HDFSFileSinkTest {
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  private final String part0 = "part-r-00000";
-  private final String foobar = "foobar";
-
-  private <T> void doWrite(Sink<T> sink,
-                           PipelineOptions options,
-                           Iterable<T> toWrite) throws Exception {
-    Sink.WriteOperation<T, String> writeOperation =
-        (Sink.WriteOperation<T, String>) sink.createWriteOperation();
-    Sink.Writer<T, String> writer = writeOperation.createWriter(options);
-    writer.openUnwindowed(UUID.randomUUID().toString(),  -1, -1);
-    for (T t: toWrite) {
-      writer.write(t);
-    }
-    String writeResult = writer.close();
-    writeOperation.finalize(Collections.singletonList(writeResult), options);
-  }
-
-  @Test
-  public void testWriteSingleRecord() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tmpFolder.newFolder();
-
-    HDFSFileSink<String, NullWritable, Text> sink =
-        HDFSFileSink.to(
-            file.toString(),
-            SequenceFileOutputFormat.class,
-            NullWritable.class,
-            Text.class,
-            new SerializableFunction<String, KV<NullWritable, Text>>() {
-              @Override
-              public KV<NullWritable, Text> apply(String input) {
-                return KV.of(NullWritable.get(), new Text(input));
-              }
-            });
-
-    doWrite(sink, options, Collections.singletonList(foobar));
-
-    SequenceFile.Reader.Option opts =
-        SequenceFile.Reader.file(new Path(file.toString(), part0));
-    SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), opts);
-    assertEquals(NullWritable.class.getName(), reader.getKeyClassName());
-    assertEquals(Text.class.getName(), reader.getValueClassName());
-    NullWritable k = NullWritable.get();
-    Text v = new Text();
-    assertEquals(true, reader.next(k, v));
-    assertEquals(NullWritable.get(), k);
-    assertEquals(new Text(foobar), v);
-  }
-
-  @Test
-  public void testToText() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tmpFolder.newFolder();
-
-    HDFSFileSink<String, NullWritable, Text> sink = HDFSFileSink.toText(file.toString());
-
-    doWrite(sink, options, Collections.singletonList(foobar));
-
-    List<String> strings = Files.readAllLines(new File(file.toString(), part0).toPath(),
-        Charset.forName("UTF-8"));
-    assertEquals(Collections.singletonList(foobar), strings);
-  }
-
-  @DefaultCoder(AvroCoder.class)
-  static class GenericClass {
-    int intField;
-    String stringField;
-    public GenericClass() {}
-    public GenericClass(int intValue, String stringValue) {
-      this.intField = intValue;
-      this.stringField = stringValue;
-    }
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("intField", intField)
-          .add("stringField", stringField)
-          .toString();
-    }
-    @Override
-    public int hashCode() {
-      return Objects.hash(intField, stringField);
-    }
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof GenericClass)) {
-        return false;
-      }
-      GenericClass o = (GenericClass) other;
-      return Objects.equals(intField, o.intField) && Objects.equals(stringField, o.stringField);
-    }
-  }
-
-  @Test
-  public void testToAvro() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tmpFolder.newFolder();
-
-    HDFSFileSink<GenericClass, AvroKey<GenericClass>, NullWritable> sink = HDFSFileSink.toAvro(
-        file.toString(),
-        AvroCoder.of(GenericClass.class),
-        new Configuration(false));
-
-    doWrite(sink, options, Collections.singletonList(new GenericClass(3, "foobar")));
-
-    GenericDatumReader datumReader = new GenericDatumReader();
-    FileReader<GenericData.Record> reader =
-        DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + ".avro"), datumReader);
-    GenericData.Record next = reader.next(null);
-    assertEquals("foobar", next.get("stringField").toString());
-    assertEquals(3, next.get("intField"));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
deleted file mode 100644
index a964239..0000000
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Tests for HDFSFileSource.
- */
-public class HDFSFileSourceTest {
-
-  private Random random = new Random(0L);
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testFullyReadSingleFile() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
-    File file = createFileWithData("tmp.seq", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-            HDFSFileSource.from(
-                    file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
-    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testFullyReadSingleFileWithSpaces() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
-    File file = createFileWithData("tmp data.seq", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-            HDFSFileSource.from(
-                    file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
-    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testFullyReadFilePattern() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
-    expectedResults.addAll(data1);
-    expectedResults.addAll(data2);
-    expectedResults.addAll(data3);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(
-            new File(file1.getParent(), "file*").toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testCloseUnstartedFilePatternReader() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(
-            new File(file1.getParent(), "file*").toString(),
-            SequenceFileInputFormat.class, IntWritable.class, Text.class);
-    Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
-
-    // Closing an unstarted FilePatternReader should not throw an exception.
-    try {
-      reader.close();
-    } catch (Exception e) {
-      fail("Closing an unstarted FilePatternReader should not throw an exception");
-    }
-  }
-
-  @Test
-  public void testSplits() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
-    File file = createFileWithData("tmp.seq", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(
-            file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
-    // Assert that the source produces the expected records
-    assertEquals(expectedResults, readFromSource(source, options));
-
-    // Split with a small bundle size (has to be at least size of sync interval)
-    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
-        .split(SequenceFile.SYNC_INTERVAL, options);
-    assertTrue(splits.size() > 2);
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
-    int nonEmptySplits = 0;
-    for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
-      if (readFromSource(subSource, options).size() > 0) {
-        nonEmptySplits += 1;
-      }
-    }
-    assertTrue(nonEmptySplits > 2);
-  }
-
-  @Test
-  public void testSplitEstimatedSize() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
-    File file = createFileWithData("tmp.avro", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    long originalSize = source.getEstimatedSizeBytes(options);
-    long splitTotalSize = 0;
-    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split(
-        SequenceFile.SYNC_INTERVAL, options
-    );
-    for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
-      splitTotalSize += splitSource.getEstimatedSizeBytes(options);
-    }
-    // Assert that the estimated size of the whole is the sum of its parts
-    assertEquals(originalSize, splitTotalSize);
-  }
-
-  private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
-      throws IOException {
-    File tmpFile = tmpFolder.newFile(filename);
-    try (Writer writer = SequenceFile.createWriter(new Configuration(),
-        Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
-        Writer.file(new Path(tmpFile.toURI())))) {
-
-      for (KV<IntWritable, Text> record : records) {
-        writer.append(record.getKey(), record.getValue());
-      }
-    }
-    return tmpFile;
-  }
-
-  private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
-                                                          int numItems, int offset) {
-    List<KV<IntWritable, Text>> records = new ArrayList<>();
-    for (int i = 0; i < numItems; i++) {
-      IntWritable key = new IntWritable(i + offset);
-      Text value = new Text(createRandomString(dataItemLength));
-      records.add(KV.of(key, value));
-    }
-    return records;
-  }
-
-  private String createRandomString(int length) {
-    char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < length; i++) {
-      builder.append(chars[random.nextInt(chars.length)]);
-    }
-    return builder.toString();
-  }
-
-}


[3/3] beam git commit: This closes #2908

Posted by dh...@apache.org.
This closes #2908


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/610bda16
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/610bda16
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/610bda16

Branch: refs/heads/master
Commit: 610bda1682d132a2fad958743ab173cbfab085cf
Parents: 3bffe0e 7512a73
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 5 09:57:15 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 5 09:57:15 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/hadoop-file-system/README.md       |  43 --
 sdks/java/io/hadoop-file-system/pom.xml         |  24 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   | 478 --------------
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 -------------------
 .../java/org/apache/beam/sdk/io/hdfs/Sink.java  | 195 ------
 .../org/apache/beam/sdk/io/hdfs/UGIHelper.java  |  38 --
 .../java/org/apache/beam/sdk/io/hdfs/Write.java | 588 -----------------
 .../apache/beam/sdk/io/hdfs/package-info.java   |   3 +-
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      | 172 -----
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    | 231 -------
 10 files changed, 2 insertions(+), 2395 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: [BEAM-2016] Delete HdfsFileSource & Sink

Posted by dh...@apache.org.
[BEAM-2016] Delete HdfsFileSource & Sink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7512a73c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7512a73c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7512a73c

Branch: refs/heads/master
Commit: 7512a73cf8aa2a527c89ecb054e92207411ed241
Parents: 3bffe0e
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 4 18:33:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 5 08:48:04 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/hadoop-file-system/README.md       |  43 --
 sdks/java/io/hadoop-file-system/pom.xml         |  24 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   | 478 --------------
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 -------------------
 .../java/org/apache/beam/sdk/io/hdfs/Sink.java  | 195 ------
 .../org/apache/beam/sdk/io/hdfs/UGIHelper.java  |  38 --
 .../java/org/apache/beam/sdk/io/hdfs/Write.java | 588 -----------------
 .../apache/beam/sdk/io/hdfs/package-info.java   |   3 +-
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      | 172 -----
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    | 231 -------
 10 files changed, 2 insertions(+), 2395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/README.md b/sdks/java/io/hadoop-file-system/README.md
deleted file mode 100644
index 3a734f2..0000000
--- a/sdks/java/io/hadoop-file-system/README.md
+++ /dev/null
@@ -1,43 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-# HDFS IO
-
-This library provides HDFS sources and sinks to make it possible to read and
-write Apache Hadoop file formats from Apache Beam pipelines.
-
-Currently, only the read path is implemented. A `HDFSFileSource` allows any
-Hadoop `FileInputFormat` to be read as a `PCollection`.
-
-A `HDFSFileSource` can be read from using the
-`org.apache.beam.sdk.io.Read` transform. For example:
-
-```java
-HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
-  MyKey.class, MyValue.class);
-PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
-```
-
-Alternatively, the `readFrom` method is a convenience method that returns a read
-transform. For example:
-
-```java
-PCollection<KV<MyKey, MyValue>> records = pipeline.apply(HDFSFileSource.readFrom(path,
-  MyInputFormat.class, MyKey.class, MyValue.class));
-```

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index 562277e..3b392c2 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -82,11 +82,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>
@@ -124,25 +119,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <version>${avro.version}</version>
-      <classifier>hadoop2</classifier>
-      <exclusions>
-        <!-- exclude old Jetty version of servlet API -->
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>servlet-api</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
deleted file mode 100644
index aee73c4..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.Random;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based
- * output
- * format.
- *
- * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop
- * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported
- * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the
- * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K
- * and V.
- *
- * <p>{@code HDFSFileSink} can be used by {@link Write} to create write
- * transform. See example below.
- *
- * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, NullWritable> sink =
- *   HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class));
- * avroRecordsPCollection.apply(Write.to(sink));
- * }
- * </pre>
- *
- * @param <T> the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be written to the sink via {@link FileOutputFormat}.
- * @param <V> the type of values to be written to the sink via {@link FileOutputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
-
-  private static final JobID jobId = new JobID(
-      Long.toString(System.currentTimeMillis()),
-      new Random().nextInt(Integer.MAX_VALUE));
-
-  public abstract String path();
-  public abstract Class<? extends FileOutputFormat<K, V>> formatClass();
-  public abstract Class<K> keyClass();
-  public abstract Class<V> valueClass();
-  public abstract SerializableFunction<T, KV<K, V>> outputConverter();
-  public abstract SerializableConfiguration serializableConfiguration();
-  public @Nullable abstract String username();
-  public abstract boolean validate();
-
-  // =======================================================================
-  // Factory methods
-  // =======================================================================
-
-  public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, V>
-  to(String path,
-     Class<W> formatClass,
-     Class<K> keyClass,
-     Class<V> valueClass,
-     SerializableFunction<T, KV<K, V>> outputConverter) {
-    return HDFSFileSink.<T, K, V>builder()
-        .setPath(path)
-        .setFormatClass(formatClass)
-        .setKeyClass(keyClass)
-        .setValueClass(valueClass)
-        .setOutputConverter(outputConverter)
-        .setConfiguration(null)
-        .setUsername(null)
-        .setValidate(true)
-        .build();
-  }
-
-  public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) {
-    SerializableFunction<T, KV<NullWritable, Text>> outputConverter =
-        new SerializableFunction<T, KV<NullWritable, Text>>() {
-          @Override
-          public KV<NullWritable, Text> apply(T input) {
-            return KV.of(NullWritable.get(), new Text(input.toString()));
-          }
-        };
-    return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter);
-  }
-
-  /**
-   * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration
-   * object is altered to enable Avro output.
-   */
-  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
-                                                                     final AvroCoder<T> coder,
-                                                                     Configuration conf) {
-    SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter =
-        new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() {
-          @Override
-          public KV<AvroKey<T>, NullWritable> apply(T input) {
-            return KV.of(new AvroKey<>(input), NullWritable.get());
-          }
-        };
-    conf.set("avro.schema.output.key", coder.getSchema().toString());
-    return to(
-        path,
-        AvroKeyOutputFormat.class,
-        (Class<AvroKey<T>>) (Class<?>) AvroKey.class,
-        NullWritable.class,
-        outputConverter).withConfiguration(conf);
-  }
-
-  /**
-   * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration
-   * object is altered to enable Avro output.
-   */
-  public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, NullWritable>
-  toAvro(String path, Schema schema, Configuration conf) {
-    return toAvro(path, AvroCoder.of(schema), conf);
-  }
-
-  /**
-   * Helper to create Avro sink given {@link Class}. Keep in mind that configuration
-   * object is altered to enable Avro output.
-   */
-  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
-                                                                     Class<T> cls,
-                                                                     Configuration conf) {
-    return toAvro(path, AvroCoder.of(cls), conf);
-  }
-
-  // =======================================================================
-  // Builder methods
-  // =======================================================================
-
-  public abstract Builder<T, K, V> toBuilder();
-  public static <T, K, V> Builder builder() {
-    return new AutoValue_HDFSFileSink.Builder<>();
-  }
-
-  /**
-   * AutoValue builder for {@link HDFSFileSink}.
-   */
-  @AutoValue.Builder
-  public abstract static class Builder<T, K, V> {
-    public abstract Builder<T, K, V> setPath(String path);
-    public abstract Builder<T, K, V> setFormatClass(
-        Class<? extends FileOutputFormat<K, V>> formatClass);
-    public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass);
-    public abstract Builder<T, K, V> setValueClass(Class<V> valueClass);
-    public abstract Builder<T, K, V> setOutputConverter(
-        SerializableFunction<T, KV<K, V>> outputConverter);
-    public abstract Builder<T, K, V> setSerializableConfiguration(
-        SerializableConfiguration serializableConfiguration);
-    public Builder<T, K, V> setConfiguration(@Nullable Configuration configuration) {
-      if (configuration == null) {
-        configuration = new Configuration(false);
-      }
-      return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
-    }
-    public abstract Builder<T, K, V> setUsername(String username);
-    public abstract Builder<T, K, V> setValidate(boolean validate);
-    public abstract HDFSFileSink<T, K, V> build();
-  }
-
-  public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration configuration) {
-    return this.toBuilder().setConfiguration(configuration).build();
-  }
-
-  public HDFSFileSink<T, K, V> withUsername(@Nullable String username) {
-    return this.toBuilder().setUsername(username).build();
-  }
-
-  // =======================================================================
-  // Sink
-  // =======================================================================
-
-  @Override
-  public void validate(PipelineOptions options) {
-    if (validate()) {
-      try {
-        UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
-          @Override
-          public Void run() throws Exception {
-            FileSystem fs = FileSystem.get(new URI(path()),
-                SerializableConfiguration.newConfiguration(serializableConfiguration()));
-            checkState(!fs.exists(new Path(path())), "Output path %s already exists", path());
-            return null;
-          }
-        });
-      } catch (IOException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  @Override
-  public Sink.WriteOperation<T, String> createWriteOperation() {
-    return new HDFSWriteOperation<>(this, path(), formatClass());
-  }
-
-  private Job newJob() throws IOException {
-    Job job = SerializableConfiguration.newJob(serializableConfiguration());
-    job.setJobID(jobId);
-    job.setOutputKeyClass(keyClass());
-    job.setOutputValueClass(valueClass());
-    return job;
-  }
-
-  // =======================================================================
-  // WriteOperation
-  // =======================================================================
-
-  /** {{@link WriteOperation}} for HDFS. */
-  private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, String> {
-
-    private final HDFSFileSink<T, K, V> sink;
-    private final String path;
-    private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
-    HDFSWriteOperation(HDFSFileSink<T, K, V> sink,
-                       String path,
-                       Class<? extends FileOutputFormat<K, V>> formatClass) {
-      this.sink = sink;
-      this.path = path;
-      this.formatClass = formatClass;
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) throws Exception {
-      Job job = sink.newJob();
-      FileOutputFormat.setOutputPath(job, new Path(path));
-    }
-
-    @Override
-    public void setWindowedWrites(boolean windowedWrites) {
-    }
-
-    @Override
-    public void finalize(final Iterable<String> writerResults, PipelineOptions options)
-        throws Exception {
-      UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          doFinalize(writerResults);
-          return null;
-        }
-      });
-    }
-
-    private void doFinalize(Iterable<String> writerResults) throws Exception {
-      Job job = sink.newJob();
-      FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
-      // If there are 0 output shards, just create output folder.
-      if (!writerResults.iterator().hasNext()) {
-        fs.mkdirs(new Path(path));
-        return;
-      }
-
-      // job successful
-      JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
-      FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context);
-      outputCommitter.commitJob(context);
-
-      // get actual output shards
-      Set<String> actual = Sets.newHashSet();
-      FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-          String name = path.getName();
-          return !name.startsWith("_") && !name.startsWith(".");
-        }
-      });
-
-      // get expected output shards
-      Set<String> expected = Sets.newHashSet(writerResults);
-      checkState(
-          expected.size() == Lists.newArrayList(writerResults).size(),
-          "Data loss due to writer results hash collision");
-      for (FileStatus s : statuses) {
-        String name = s.getPath().getName();
-        int pos = name.indexOf('.');
-        actual.add(pos > 0 ? name.substring(0, pos) : name);
-      }
-
-      checkState(actual.equals(expected), "Writer results and output files do not match");
-
-      // rename output shards to Hadoop style, i.e. part-r-00000.txt
-      int i = 0;
-      for (FileStatus s : statuses) {
-        String name = s.getPath().getName();
-        int pos = name.indexOf('.');
-        String ext = pos > 0 ? name.substring(pos) : "";
-        fs.rename(
-            s.getPath(),
-            new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext)));
-        i++;
-      }
-    }
-
-    @Override
-    public Writer<T, String> createWriter(PipelineOptions options) throws Exception {
-      return new HDFSWriter<>(this, path, formatClass);
-    }
-
-    @Override
-    public Sink<T> getSink() {
-      return sink;
-    }
-
-    @Override
-    public Coder<String> getWriterResultCoder() {
-      return StringUtf8Coder.of();
-    }
-
-  }
-
-  // =======================================================================
-  // Writer
-  // =======================================================================
-
-  private static class HDFSWriter<T, K, V> extends Writer<T, String> {
-
-    private final HDFSWriteOperation<T, K, V> writeOperation;
-    private final String path;
-    private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
-    // unique hash for each task
-    private int hash;
-
-    private TaskAttemptContext context;
-    private RecordWriter<K, V> recordWriter;
-    private FileOutputCommitter outputCommitter;
-
-    HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation,
-               String path,
-               Class<? extends FileOutputFormat<K, V>> formatClass) {
-      this.writeOperation = writeOperation;
-      this.path = path;
-      this.formatClass = formatClass;
-    }
-
-    @Override
-    public void openWindowed(final String uId,
-                             BoundedWindow window,
-                             PaneInfo paneInfo,
-                             int shard,
-                             int numShards) throws Exception {
-      throw new UnsupportedOperationException("Windowing support not implemented yet for"
-          + "HDFS. Window " + window);
-    }
-
-    @Override
-    public void openUnwindowed(final String uId, int shard, int numShards) throws Exception {
-      UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
-          new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-              doOpen(uId);
-              return null;
-            }
-          }
-      );
-    }
-
-    private void doOpen(String uId) throws Exception {
-      this.hash = uId.hashCode();
-
-      Job job = writeOperation.sink.newJob();
-      FileOutputFormat.setOutputPath(job, new Path(path));
-
-      // Each Writer is responsible for writing one bundle of elements and is represented by one
-      // unique Hadoop task based on uId/hash. All tasks share the same job ID.
-      JobID jobId = job.getJobID();
-      TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
-      context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
-
-      FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
-      recordWriter = outputFormat.getRecordWriter(context);
-      outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
-    }
-
-    @Override
-    public void write(T value) throws Exception {
-      checkNotNull(recordWriter,
-          "Record writer can't be null. Make sure to open Writer first!");
-      KV<K, V> kv = writeOperation.sink.outputConverter().apply(value);
-      recordWriter.write(kv.getKey(), kv.getValue());
-    }
-
-    @Override
-    public void cleanup() throws Exception {
-
-    }
-
-    @Override
-    public String close() throws Exception {
-      return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
-          new PrivilegedExceptionAction<String>() {
-            @Override
-            public String run() throws Exception {
-              return doClose();
-            }
-          });
-    }
-
-    private String doClose() throws Exception {
-      // task/attempt successful
-      recordWriter.close(context);
-      outputCommitter.commitTask(context);
-
-      // result is prefix of the output file name
-      return String.format("part-r-%d", hash);
-    }
-
-    @Override
-    public WriteOperation<T, String> getWriteOperation() {
-      return writeOperation;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
deleted file mode 100644
index 5cc2097..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.io.hadoop.WritableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a
- * Hadoop file-based input format.
- *
- * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
- * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
- * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to
- * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
- * key class and the value class.
- *
- * <p>A {@code HDFSFileSource} can be read from using the
- * {@link org.apache.beam.sdk.io.Read} transform. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
- *   MyKey.class, MyValue.class);
- * PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
- * }
- * </pre>
- *
- * <p>Implementation note: Since Hadoop's
- * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
- * determines the input splits, this class extends {@link BoundedSource} rather than
- * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
- * dictates input splits.
- * @param <T> the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be read from the source via {@link FileInputFormat}.
- * @param <V> the type of values to be read from the source via {@link FileInputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
-  private static final long serialVersionUID = 0L;
-
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class);
-
-  public abstract String filepattern();
-  public abstract Class<? extends FileInputFormat<K, V>> formatClass();
-  public abstract Coder<T> coder();
-  public abstract SerializableFunction<KV<K, V>, T> inputConverter();
-  public abstract SerializableConfiguration serializableConfiguration();
-  public @Nullable abstract SerializableSplit serializableSplit();
-  public @Nullable abstract String username();
-  public abstract boolean validateSource();
-
-  // =======================================================================
-  // Factory methods
-  // =======================================================================
-
-  public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, K, V>
-  from(String filepattern,
-       Class<W> formatClass,
-       Coder<T> coder,
-       SerializableFunction<KV<K, V>, T> inputConverter) {
-    return HDFSFileSource.<T, K, V>builder()
-        .setFilepattern(filepattern)
-        .setFormatClass(formatClass)
-        .setCoder(coder)
-        .setInputConverter(inputConverter)
-        .setConfiguration(null)
-        .setUsername(null)
-        .setValidateSource(true)
-        .setSerializableSplit(null)
-        .build();
-  }
-
-  public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, V>, K, V>
-  from(String filepattern,
-       Class<W> formatClass,
-       Class<K> keyClass,
-       Class<V> valueClass) {
-    KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
-    SerializableFunction<KV<K, V>, KV<K, V>> inputConverter =
-        new SerializableFunction<KV<K, V>, KV<K, V>>() {
-          @Override
-          public KV<K, V> apply(KV<K, V> input) {
-            return input;
-          }
-        };
-    return HDFSFileSource.<KV<K, V>, K, V>builder()
-        .setFilepattern(filepattern)
-        .setFormatClass(formatClass)
-        .setCoder(coder)
-        .setInputConverter(inputConverter)
-        .setConfiguration(null)
-        .setUsername(null)
-        .setValidateSource(true)
-        .setSerializableSplit(null)
-        .build();
-  }
-
-  public static HDFSFileSource<String, LongWritable, Text>
-  fromText(String filepattern) {
-    SerializableFunction<KV<LongWritable, Text>, String> inputConverter =
-        new SerializableFunction<KV<LongWritable, Text>, String>() {
-      @Override
-      public String apply(KV<LongWritable, Text> input) {
-        return input.getValue().toString();
-      }
-    };
-    return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter);
-  }
-
-  /**
-   * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration
-   * object is altered to enable Avro input.
-   */
-  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
-  fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) {
-    Class<AvroKeyInputFormat<T>> formatClass = castClass(AvroKeyInputFormat.class);
-    SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter =
-        new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() {
-          @Override
-          public T apply(KV<AvroKey<T>, NullWritable> input) {
-            try {
-              return CoderUtils.clone(coder, input.getKey().datum());
-            } catch (CoderException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        };
-    conf.set("avro.schema.input.key", coder.getSchema().toString());
-    return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf);
-  }
-
-  /**
-   * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration
-   * object is altered to enable Avro input.
-   */
-  public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, NullWritable>
-  fromAvro(String filepattern, Schema schema, Configuration conf) {
-    return fromAvro(filepattern, AvroCoder.of(schema), conf);
-  }
-
-  /**
-   * Helper to read from Avro source given {@link Class}. Keep in mind that configuration
-   * object is altered to enable Avro input.
-   */
-  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
-  fromAvro(String filepattern, Class<T> cls, Configuration conf) {
-    return fromAvro(filepattern, AvroCoder.of(cls), conf);
-  }
-
-  // =======================================================================
-  // Builder methods
-  // =======================================================================
-
-  public abstract HDFSFileSource.Builder<T, K, V> toBuilder();
-  public static <T, K, V> HDFSFileSource.Builder builder() {
-    return new AutoValue_HDFSFileSource.Builder<>();
-  }
-
-  /**
-   * AutoValue builder for {@link HDFSFileSource}.
-   */
-  @AutoValue.Builder
-  public abstract static class Builder<T, K, V> {
-    public abstract Builder<T, K, V> setFilepattern(String filepattern);
-    public abstract Builder<T, K, V> setFormatClass(
-        Class<? extends FileInputFormat<K, V>> formatClass);
-    public abstract Builder<T, K, V> setCoder(Coder<T> coder);
-    public abstract Builder<T, K, V> setInputConverter(
-        SerializableFunction<KV<K, V>, T> inputConverter);
-    public abstract Builder<T, K, V> setSerializableConfiguration(
-        SerializableConfiguration serializableConfiguration);
-    public Builder<T, K, V> setConfiguration(Configuration configuration) {
-      if (configuration == null) {
-        configuration = new Configuration(false);
-      }
-      return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
-    }
-    public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit serializableSplit);
-    public abstract Builder<T, K, V> setUsername(@Nullable String username);
-    public abstract Builder<T, K, V> setValidateSource(boolean validate);
-    public abstract HDFSFileSource<T, K, V> build();
-  }
-
-  public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration configuration) {
-    return this.toBuilder().setConfiguration(configuration).build();
-  }
-
-  public HDFSFileSource<T, K, V> withUsername(@Nullable String username) {
-    return this.toBuilder().setUsername(username).build();
-  }
-
-  // =======================================================================
-  // BoundedSource
-  // =======================================================================
-
-  @Override
-  public List<? extends BoundedSource<T>> split(
-      final long desiredBundleSizeBytes,
-      PipelineOptions options) throws Exception {
-    if (serializableSplit() == null) {
-      List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs(
-          new PrivilegedExceptionAction<List<InputSplit>>() {
-            @Override
-            public List<InputSplit> run() throws Exception {
-              return computeSplits(desiredBundleSizeBytes, serializableConfiguration());
-            }
-          });
-      return Lists.transform(inputSplits,
-          new Function<InputSplit, BoundedSource<T>>() {
-            @Override
-            public BoundedSource<T> apply(@Nullable InputSplit inputSplit) {
-              SerializableSplit serializableSplit = new SerializableSplit(inputSplit);
-              return HDFSFileSource.this.toBuilder()
-                  .setSerializableSplit(serializableSplit)
-                  .build();
-            }
-          });
-    } else {
-      return ImmutableList.of(this);
-    }
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) {
-    long size = 0;
-
-    try {
-      // If this source represents a split from split,
-      // then return the size of the split, rather then the entire input
-      if (serializableSplit() != null) {
-        return serializableSplit().getSplit().getLength();
-      }
-
-      size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Long>() {
-        @Override
-        public Long run() throws Exception {
-          long size = 0;
-          Job job = SerializableConfiguration.newJob(serializableConfiguration());
-          for (FileStatus st : listStatus(createFormat(job), job)) {
-            size += st.getLen();
-          }
-          return size;
-        }
-      });
-    } catch (IOException e) {
-      LOG.warn(
-          "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
-      // ignore, and return 0
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.warn(
-          "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
-      // ignore, and return 0
-    }
-    return size;
-  }
-
-  @Override
-  public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-    this.validate();
-    return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit());
-  }
-
-  @Override
-  public void validate() {
-    if (validateSource()) {
-      try {
-        UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
-                final Path pathPattern = new Path(filepattern());
-                FileSystem fs = FileSystem.get(pathPattern.toUri(),
-                    SerializableConfiguration.newConfiguration(serializableConfiguration()));
-                FileStatus[] fileStatuses = fs.globStatus(pathPattern);
-                checkState(
-                    fileStatuses != null && fileStatuses.length > 0,
-                    "Unable to find any files matching %s", filepattern());
-                  return null;
-                }
-              });
-      } catch (IOException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return coder();
-  }
-
-  // =======================================================================
-  // Helpers
-  // =======================================================================
-
-  private List<InputSplit> computeSplits(long desiredBundleSizeBytes,
-                                         SerializableConfiguration serializableConfiguration)
-      throws IOException, IllegalAccessException, InstantiationException {
-    Job job = SerializableConfiguration.newJob(serializableConfiguration);
-    FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
-    FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
-    return createFormat(job).getSplits(job);
-  }
-
-  private FileInputFormat<K, V> createFormat(Job job)
-      throws IOException, IllegalAccessException, InstantiationException {
-    Path path = new Path(filepattern());
-    FileInputFormat.addInputPath(job, path);
-    return formatClass().newInstance();
-  }
-
-  private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job)
-      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-    // FileInputFormat#listStatus is protected, so call using reflection
-    Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
-    listStatus.setAccessible(true);
-    @SuppressWarnings("unchecked")
-    List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job);
-    return stat;
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> Coder<T> getDefaultCoder(Class<T> c) {
-    if (Writable.class.isAssignableFrom(c)) {
-      Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
-      return (Coder<T>) WritableCoder.of(writableClass);
-    } else if (Void.class.equals(c)) {
-      return (Coder<T>) VoidCoder.of();
-    }
-    // TODO: how to use registered coders here?
-    throw new IllegalStateException("Cannot find coder for " + c);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> Class<T> castClass(Class<?> aClass) {
-    return (Class<T>) aClass;
-  }
-
-  // =======================================================================
-  // BoundedReader
-  // =======================================================================
-
-  private static class HDFSFileReader<T, K, V> extends BoundedSource.BoundedReader<T> {
-
-    private final HDFSFileSource<T, K, V> source;
-    private final String filepattern;
-    private final Class<? extends FileInputFormat<K, V>> formatClass;
-    private final Job job;
-
-    private List<InputSplit> splits;
-    private ListIterator<InputSplit> splitsIterator;
-
-    private Configuration conf;
-    private FileInputFormat<?, ?> format;
-    private TaskAttemptContext attemptContext;
-    private RecordReader<K, V> currentReader;
-    private KV<K, V> currentPair;
-
-    HDFSFileReader(
-        HDFSFileSource<T, K, V> source,
-        String filepattern,
-        Class<? extends FileInputFormat<K, V>> formatClass,
-        SerializableSplit serializableSplit)
-        throws IOException {
-      this.source = source;
-      this.filepattern = filepattern;
-      this.formatClass = formatClass;
-      this.job = SerializableConfiguration.newJob(source.serializableConfiguration());
-
-      if (serializableSplit != null) {
-        this.splits = ImmutableList.of(serializableSplit.getSplit());
-        this.splitsIterator = splits.listIterator();
-      }
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      Path path = new Path(filepattern);
-      FileInputFormat.addInputPath(job, path);
-
-      conf = job.getConfiguration();
-      try {
-        format = formatClass.newInstance();
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException("Cannot instantiate file input format " + formatClass, e);
-      }
-      attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID());
-
-      if (splitsIterator == null) {
-        splits = format.getSplits(job);
-        splitsIterator = splits.listIterator();
-      }
-
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      try {
-        if (currentReader != null && currentReader.nextKeyValue()) {
-          currentPair = nextPair();
-          return true;
-        } else {
-          while (splitsIterator.hasNext()) {
-            // advance the reader and see if it has records
-            final InputSplit nextSplit = splitsIterator.next();
-            @SuppressWarnings("unchecked")
-            RecordReader<K, V> reader =
-                (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
-            if (currentReader != null) {
-              currentReader.close();
-            }
-            currentReader = reader;
-            UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
-                currentReader.initialize(nextSplit, attemptContext);
-                return null;
-              }
-            });
-            if (currentReader.nextKeyValue()) {
-              currentPair = nextPair();
-              return true;
-            }
-            currentReader.close();
-            currentReader = null;
-          }
-          // either no next split or all readers were empty
-          currentPair = null;
-          return false;
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      if (currentPair == null) {
-        throw new NoSuchElementException();
-      }
-      return source.inputConverter().apply(currentPair);
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (currentReader != null) {
-        currentReader.close();
-        currentReader = null;
-      }
-      currentPair = null;
-    }
-
-    @Override
-    public BoundedSource<T> getCurrentSource() {
-      return source;
-    }
-
-    @SuppressWarnings("unchecked")
-    private KV<K, V> nextPair() throws IOException, InterruptedException {
-      K key = currentReader.getCurrentKey();
-      V value = currentReader.getCurrentValue();
-      // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
-      if (key instanceof Writable) {
-        key = (K) WritableUtils.clone((Writable) key, conf);
-      }
-      if (value instanceof Writable) {
-        value = (V) WritableUtils.clone((Writable) value, conf);
-      }
-      return KV.of(key, value);
-    }
-
-    // =======================================================================
-    // Optional overrides
-    // =======================================================================
-
-    @Override
-    public Double getFractionConsumed() {
-      if (currentReader == null) {
-        return 0.0;
-      }
-      if (splits.isEmpty()) {
-        return 1.0;
-      }
-      int index = splitsIterator.previousIndex();
-      int numReaders = splits.size();
-      if (index == numReaders) {
-        return 1.0;
-      }
-      double before = 1.0 * index / numReaders;
-      double after = 1.0 * (index + 1) / numReaders;
-      Double fractionOfCurrentReader = getProgress();
-      if (fractionOfCurrentReader == null) {
-        return before;
-      }
-      return before + fractionOfCurrentReader * (after - before);
-    }
-
-    private Double getProgress() {
-      try {
-        return (double) currentReader.getProgress();
-      } catch (IOException | InterruptedException e) {
-        return null;
-      }
-    }
-
-  }
-
-  // =======================================================================
-  // SerializableSplit
-  // =======================================================================
-
-  /**
-   * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
-   * serialized using Java's standard serialization mechanisms. Note that the InputSplit
-   * has to be Writable (which most are).
-   */
-  protected static class SerializableSplit implements Externalizable {
-    private static final long serialVersionUID = 0L;
-
-    private InputSplit split;
-
-    public SerializableSplit() {
-    }
-
-    public SerializableSplit(InputSplit split) {
-      checkArgument(split instanceof Writable, "Split is not writable: %s", split);
-      this.split = split;
-    }
-
-    public InputSplit getSplit() {
-      return split;
-    }
-
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-      out.writeUTF(split.getClass().getCanonicalName());
-      ((Writable) split).write(out);
-    }
-
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-      String className = in.readUTF();
-      try {
-        split = (InputSplit) Class.forName(className).newInstance();
-        ((Writable) split).readFields(in);
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException(e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
deleted file mode 100644
index fe2db5f..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-
-/**
- * This class is deprecated, and only exists for HDFSFileSink.
- */
-@Deprecated
-public abstract class Sink<T> implements Serializable, HasDisplayData {
-  /**
-   * Ensures that the sink is valid and can be written to before the write operation begins. One
-   * should use {@link com.google.common.base.Preconditions} to implement this method.
-   */
-  public abstract void validate(PipelineOptions options);
-
-  /**
-   * Returns an instance of a {@link WriteOperation} that can write to this Sink.
-   */
-  public abstract WriteOperation<T, ?> createWriteOperation();
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may override this method
-   * to provide their own display data.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {}
-
-  /**
-   * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
-   *
-   * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
-   * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
-   * a bundle to the sink.
-   *
-   * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance,
-   * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
-   *
-   * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
-   * call to {@code initialize} method and deserialized before calls to
-   * {@code createWriter} and {@code finalized}. However, it is not
-   * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
-   * state of the {@code WriteOperation}.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of objects to write
-   * @param <WriteT> The result of a per-bundle write
-   */
-  public abstract static class WriteOperation<T, WriteT> implements Serializable {
-    /**
-     * Performs initialization before writing to the sink. Called before writing begins.
-     */
-    public abstract void initialize(PipelineOptions options) throws Exception;
-
-    /**
-     * Indicates that the operation will be performing windowed writes.
-     */
-    public abstract void setWindowedWrites(boolean windowedWrites);
-
-    /**
-     * Given an Iterable of results from bundle writes, performs finalization after writing and
-     * closes the sink. Called after all bundle writes are complete.
-     *
-     * <p>The results that are passed to finalize are those returned by bundles that completed
-     * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
-     * one writer result will be passed to finalize for each bundle. An implementation of finalize
-     * should perform clean up of any failed and successfully retried bundles.  Note that these
-     * failed bundles will not have their writer result passed to finalize, so finalize should be
-     * capable of locating any temporary/partial output written by failed bundles.
-     *
-     * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
-     * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
-     * failure/retry or for redundancy.
-     *
-     * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
-     * finalize is called multiple times.
-     *
-     * @param writerResults an Iterable of results from successful bundle writes.
-     */
-    public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
-        throws Exception;
-
-    /**
-     * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
-     *
-     * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
-     * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
-     *
-     * <p>Must not mutate the state of the WriteOperation.
-     */
-    public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
-
-    /**
-     * Returns the Sink that this write operation writes to.
-     */
-    public abstract Sink<T> getSink();
-
-    /**
-     * Returns a coder for the writer result type.
-     */
-    public abstract Coder<WriteT> getWriterResultCoder();
-  }
-
-  /**
-   * A Writer writes a bundle of elements from a PCollection to a sink.
-   * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
-   * and {@link Writer#close} is called after all elements in the bundle have been written.
-   * {@link Writer#write} writes an element to the sink.
-   *
-   * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
-   * multiple instances of a Writer may be instantiated in different threads on the same worker.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of object to write
-   * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
-   */
-  public abstract static class Writer<T, WriteT> {
-    /**
-     * Performs bundle initialization. For example, creates a temporary file for writing or
-     * initializes any state that will be used across calls to {@link Writer#write}.
-     *
-     * <p>The unique id that is given to open should be used to ensure that the writer's output does
-     * not interfere with the output of other Writers, as a bundle may be executed many times for
-     * fault tolerance. See {@link Sink} for more information about bundle ids.
-     *
-     * <p>The window and paneInfo arguments are populated when windowed writes are requested.
-     * shard and numbShards are populated for the case of static sharding. In cases where the
-     * runner is dynamically picking sharding, shard and numShards might both be set to -1.
-     */
-    public abstract void openWindowed(String uId,
-                                      BoundedWindow window,
-                                      PaneInfo paneInfo,
-                                      int shard,
-                                      int numShards) throws Exception;
-
-    /**
-     * Perform bundle initialization for the case where the file is written unwindowed.
-     */
-    public abstract void openUnwindowed(String uId,
-                                        int shard,
-                                        int numShards) throws Exception;
-
-    public abstract void cleanup() throws Exception;
-
-    /**
-     * Called for each value in the bundle.
-     */
-    public abstract void write(T value) throws Exception;
-
-    /**
-     * Finishes writing the bundle. Closes any resources used for writing the bundle.
-     *
-     * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
-     * finalization. The result should contain some way to identify the output of this bundle (using
-     * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
-     * successful writes. See {@link Sink} for more information about bundle ids.
-     *
-     * @return the writer result
-     */
-    public abstract WriteT close() throws Exception;
-
-    /**
-     * Returns the write operation this writer belongs to.
-     */
-    public abstract WriteOperation<T, WriteT> getWriteOperation();
-
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
deleted file mode 100644
index fd05a19..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * {@link UserGroupInformation} helper methods.
- */
-public class UGIHelper {
-
-  /**
-   * Find the most appropriate UserGroupInformation to use.
-   * @param username the user name, or NULL if none is specified.
-   * @return the most appropriate UserGroupInformation
-   */
-  public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException {
-    return UserGroupInformation.getBestUGI(null, username);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
deleted file mode 100644
index ef6556e..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation;
-import org.apache.beam.sdk.io.hdfs.Sink.Writer;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is deprecated, and only exists currently for HDFSFileSink.
- */
-@Deprecated
-public class Write<T> extends PTransform<PCollection<T>, PDone> {
-  private static final Logger LOG = LoggerFactory.getLogger(Write.class);
-
-  private static final int UNKNOWN_SHARDNUM = -1;
-  private static final int UNKNOWN_NUMSHARDS = -1;
-
-  private final Sink<T> sink;
-  // This allows the number of shards to be dynamically computed based on the input
-  // PCollection.
-  @Nullable
-  private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
-  // We don't use a side input for static sharding, as we want this value to be updatable
-  // when a pipeline is updated.
-  @Nullable
-  private final ValueProvider<Integer> numShardsProvider;
-  private boolean windowedWrites;
-
-  /**
-   * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
-   * control how many different shards are produced.
-   */
-  public static <T> Write<T> to(Sink<T> sink) {
-    checkNotNull(sink, "sink");
-    return new Write<>(sink, null /* runner-determined sharding */, null, false);
-  }
-
-  private Write(
-      Sink<T> sink,
-      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
-      @Nullable ValueProvider<Integer> numShardsProvider,
-      boolean windowedWrites) {
-    this.sink = sink;
-    this.computeNumShards = computeNumShards;
-    this.numShardsProvider = numShardsProvider;
-    this.windowedWrites = windowedWrites;
-  }
-
-  @Override
-  public PDone expand(PCollection<T> input) {
-    checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
-        "%s can only be applied to an unbounded PCollection if doing windowed writes",
-        Write.class.getSimpleName());
-    return createWrite(input, sink.createWriteOperation());
-  }
-
-  @Override
-  public void validate(PipelineOptions options) {
-    sink.validate(options);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    builder
-        .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
-        .include("sink", sink);
-    if (getSharding() != null) {
-      builder.include("sharding", getSharding());
-    } else if (getNumShards() != null) {
-      String numShards = getNumShards().isAccessible()
-          ? getNumShards().get().toString() : getNumShards().toString();
-      builder.add(DisplayData.item("numShards", numShards)
-          .withLabel("Fixed Number of Shards"));
-    }
-  }
-
-  /**
-   * Returns the {@link Sink} associated with this PTransform.
-   */
-  public Sink<T> getSink() {
-    return sink;
-  }
-
-  /**
-   * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
-   * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
-   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
-   * #withRunnerDeterminedSharding()}.
-   */
-  @Nullable
-  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
-    return computeNumShards;
-  }
-
-  public ValueProvider<Integer> getNumShards() {
-    return numShardsProvider;
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} using the
-   * specified number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-   * more information.
-   *
-   * <p>A value less than or equal to 0 will be equivalent to the default behavior of
-   * runner-determined sharding.
-   */
-  public Write<T> withNumShards(int numShards) {
-    if (numShards > 0) {
-      return withNumShards(StaticValueProvider.of(numShards));
-    }
-    return withRunnerDeterminedSharding();
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} using the
-   * {@link ValueProvider} specified number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-   * more information.
-   */
-  public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
-    return new Write<>(sink, null, numShardsProvider, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} using the
-   * specified {@link PTransform} to compute the number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-   * more information.
-   */
-  public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
-    checkNotNull(
-        sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
-    return new Write<>(sink, sharding, null, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} with
-   * runner-determined sharding.
-   */
-  public Write<T> withRunnerDeterminedSharding() {
-    return new Write<>(sink, null, null, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that writes preserves windowing on it's input.
-   *
-   * <p>If this option is not specified, windowing and triggering are replaced by
-   * {@link GlobalWindows} and {@link DefaultTrigger}.
-   *
-   * <p>If there is no data for a window, no output shards will be generated for that window.
-   * If a window triggers multiple times, then more than a single output shard might be
-   * generated multiple times; it's up to the sink implementation to keep these output shards
-   * unique.
-   *
-   * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
-   * positive value.
-   */
-  public Write<T> withWindowedWrites() {
-    return new Write<>(sink, computeNumShards, numShardsProvider, true);
-  }
-
-  /**
-   * Writes all the elements in a bundle using a {@link Writer} produced by the
-   * {@link WriteOperation} associated with the {@link Sink}.
-   */
-  private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
-    // Writer that will write the records in this bundle. Lazily
-    // initialized in processElement.
-    private Writer<T, WriteT> writer = null;
-    private BoundedWindow window;
-    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
-    WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
-      this.writeOperationView = writeOperationView;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      // Lazily initialize the Writer
-      if (writer == null) {
-        WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-        LOG.info("Opening writer for write operation {}", writeOperation);
-        writer = writeOperation.createWriter(c.getPipelineOptions());
-
-        if (windowedWrites) {
-          writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
-              UNKNOWN_NUMSHARDS);
-        } else {
-          writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
-        }
-        this.window = window;
-        LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
-      }
-      try {
-        writer.write(c.element());
-      } catch (Exception e) {
-        // Discard write result and close the write.
-        try {
-          writer.close();
-          // The writer does not need to be reset, as this DoFn cannot be reused.
-        } catch (Exception closeException) {
-          if (closeException instanceof InterruptedException) {
-            // Do not silently ignore interrupted state.
-            Thread.currentThread().interrupt();
-          }
-          // Do not mask the exception that caused the write to fail.
-          e.addSuppressed(closeException);
-        }
-        throw e;
-      }
-    }
-
-    @FinishBundle
-    public void finishBundle(FinishBundleContext c) throws Exception {
-      if (writer != null) {
-        WriteT result = writer.close();
-        c.output(result, window.maxTimestamp(), window);
-        // Reset state in case of reuse.
-        writer = null;
-        window = null;
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(Write.this);
-    }
-  }
-
-  /**
-   * Like {@link WriteBundles}, but where the elements for each shard have been collected into
-   * a single iterable.
-   *
-   * @see WriteBundles
-   */
-  private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
-    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-    private final PCollectionView<Integer> numShardsView;
-
-    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView,
-                        PCollectionView<Integer> numShardsView) {
-      this.writeOperationView = writeOperationView;
-      this.numShardsView = numShardsView;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
-      // In a sharded write, single input element represents one shard. We can open and close
-      // the writer in each call to processElement.
-      WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-      LOG.info("Opening writer for write operation {}", writeOperation);
-      Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-      if (windowedWrites) {
-        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
-            numShards);
-      } else {
-        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
-      }
-      LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
-
-      try {
-        try {
-          for (T t : c.element().getValue()) {
-            writer.write(t);
-          }
-        } catch (Exception e) {
-          try {
-            writer.close();
-          } catch (Exception closeException) {
-            if (closeException instanceof InterruptedException) {
-              // Do not silently ignore interrupted state.
-              Thread.currentThread().interrupt();
-            }
-            // Do not mask the exception that caused the write to fail.
-            e.addSuppressed(closeException);
-          }
-          throw e;
-        }
-
-        // Close the writer; if this throws let the error propagate.
-        WriteT result = writer.close();
-        c.output(result);
-      } catch (Exception e) {
-        // If anything goes wrong, make sure to delete the temporary file.
-        writer.cleanup();
-        throw e;
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(Write.this);
-    }
-  }
-
-  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
-    private final PCollectionView<Integer> numShardsView;
-    private final ValueProvider<Integer> numShardsProvider;
-    private int shardNumber;
-
-    ApplyShardingKey(PCollectionView<Integer> numShardsView,
-                     ValueProvider<Integer> numShardsProvider) {
-      this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
-      shardNumber = UNKNOWN_SHARDNUM;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      int shardCount = 0;
-      if (numShardsView != null) {
-        shardCount = context.sideInput(numShardsView);
-      } else {
-        checkNotNull(numShardsProvider);
-        shardCount = numShardsProvider.get();
-      }
-      checkArgument(
-          shardCount > 0,
-          "Must have a positive number of shards specified for non-runner-determined sharding."
-              + " Got %s",
-          shardCount);
-      if (shardNumber == UNKNOWN_SHARDNUM) {
-        // We want to desynchronize the first record sharding key for each instance of
-        // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
-        shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
-      } else {
-        shardNumber = (shardNumber + 1) % shardCount;
-      }
-      context.output(KV.of(shardNumber, context.element()));
-    }
-  }
-
-  /**
-   * A write is performed as sequence of three {@link ParDo}'s.
-   *
-   * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
-   * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
-   * called. The output of this ParDo is a singleton PCollection
-   * containing the WriteOperation.
-   *
-   * <p>This singleton collection containing the WriteOperation is then used as a side input to a
-   * ParDo over the PCollection of elements to write. In this bundle-writing phase,
-   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-   * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
-   * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
-   * every element in the bundle. The output of this ParDo is a PCollection of
-   * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
-   * each bundle.
-   *
-   * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
-   * the collection of writer results as a side-input. In this ParDo,
-   * {@link WriteOperation#finalize} is called to finalize the write.
-   *
-   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
-   * before the exception that caused the write to fail is propagated and the write result will be
-   * discarded.
-   *
-   * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
-   * deserialized in the bundle-writing and finalization phases, any state change to the
-   * WriteOperation object that occurs during initialization is visible in the latter phases.
-   * However, the WriteOperation is not serialized after the bundle-writing phase.  This is why
-   * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
-   * WriteOperation).
-   */
-  private <WriteT> PDone createWrite(
-      PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
-    Pipeline p = input.getPipeline();
-    writeOperation.setWindowedWrites(windowedWrites);
-
-    // A coder to use for the WriteOperation.
-    @SuppressWarnings("unchecked")
-    Coder<WriteOperation<T, WriteT>> operationCoder =
-        (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
-    // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
-    // the sink.
-    PCollection<WriteOperation<T, WriteT>> operationCollection =
-        p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder));
-
-    // Initialize the resource in a do-once ParDo on the WriteOperation.
-    operationCollection = operationCollection
-        .apply("Initialize", ParDo.of(
-            new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
-              @ProcessElement
-              public void processElement(ProcessContext c) throws Exception {
-                WriteOperation<T, WriteT> writeOperation = c.element();
-                LOG.info("Initializing write operation {}", writeOperation);
-                writeOperation.initialize(c.getPipelineOptions());
-                writeOperation.setWindowedWrites(windowedWrites);
-                LOG.debug("Done initializing write operation {}", writeOperation);
-                // The WriteOperation is also the output of this ParDo, so it can have mutable
-                // state.
-                c.output(writeOperation);
-              }
-            }))
-        .setCoder(operationCoder);
-
-    // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
-    final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
-        operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
-    if (!windowedWrites) {
-      // Re-window the data into the global window and remove any existing triggers.
-      input =
-          input.apply(
-              Window.<T>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes());
-    }
-
-
-    // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
-    // as a side input) and collect the results of the writes in a PCollection.
-    // There is a dependency between this ParDo and the first (the WriteOperation PCollection
-    // as a side input), so this will happen after the initial ParDo.
-    PCollection<WriteT> results;
-    final PCollectionView<Integer> numShardsView;
-    if (computeNumShards == null && numShardsProvider == null) {
-      if (windowedWrites) {
-        throw new IllegalStateException("When doing windowed writes, numShards must be set"
-            + "explicitly to a positive value");
-      }
-      numShardsView = null;
-      results = input
-          .apply("WriteBundles",
-              ParDo.of(new WriteBundles<>(writeOperationView))
-                  .withSideInputs(writeOperationView));
-    } else {
-      if (computeNumShards != null) {
-        numShardsView = input.apply(computeNumShards);
-        results  = input
-            .apply("ApplyShardLabel", ParDo.of(
-                new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView))
-                    .withSideInputs(numShardsView, writeOperationView));
-      } else {
-        numShardsView = null;
-        results = input
-            .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
-                    .withSideInputs(writeOperationView));
-      }
-    }
-    results.setCoder(writeOperation.getWriterResultCoder());
-
-    if (windowedWrites) {
-      // When processing streaming windowed writes, results will arrive multiple times. This
-      // means we can't share the below implementation that turns the results into a side input,
-      // as new data arriving into a side input does not trigger the listening DoFn. Instead
-      // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
-      // whenever new data arrives.
-      PCollection<KV<Void, WriteT>> keyedResults =
-          results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null));
-      keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation
-          .getWriterResultCoder()));
-
-      // Is the continuation trigger sufficient?
-      keyedResults
-          .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
-          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = Lists.newArrayList(c.element().getValue());
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(writeOperationView));
-    } else {
-      final PCollectionView<Iterable<WriteT>> resultsView =
-          results.apply(View.<WriteT>asIterable());
-      ImmutableList.Builder<PCollectionView<?>> sideInputs =
-          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-      if (numShardsView != null) {
-        sideInputs.add(numShardsView);
-      }
-
-      // Finalize the write in another do-once ParDo on the singleton collection containing the
-      // Writer. The results from the per-bundle writes are given as an Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in the first do-once
-      // ParDo. There is a dependency between this ParDo and the parallel write (the writer
-      // results collection as a side input), so it will happen after the parallel write.
-      // For the non-windowed case, we guarantee that  if no data is written but the user has
-      // set numShards, then all shards will be written out as empty files. For this reason we
-      // use a side input here.
-      operationCollection
-          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
-              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
-              // We must always output at least 1 shard, and honor user-specified numShards if
-              // set.
-              int minShardsNeeded;
-              if (numShardsView != null) {
-                minShardsNeeded = c.sideInput(numShardsView);
-              } else if (numShardsProvider != null) {
-                minShardsNeeded = numShardsProvider.get();
-              } else {
-                minShardsNeeded = 1;
-              }
-              int extraShardsNeeded = minShardsNeeded - results.size();
-              if (extraShardsNeeded > 0) {
-                LOG.info(
-                    "Creating {} empty output shards in addition to {} written for a total of "
-                        + " {}.", extraShardsNeeded, results.size(), minShardsNeeded);
-                for (int i = 0; i < extraShardsNeeded; ++i) {
-                  Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-                  writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
-                      UNKNOWN_NUMSHARDS);
-                  WriteT emptyWrite = writer.close();
-                  results.add(emptyWrite);
-                }
-                LOG.debug("Done creating extra shards.");
-              }
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(sideInputs.build()));
-    }
-    return PDone.in(input.getPipeline());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
index 763b30a..32c36cc 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
@@ -17,6 +17,7 @@
  */
 
 /**
- * Transforms used to read from the Hadoop file system (HDFS).
+ * {@link org.apache.beam.sdk.io.FileSystem} implementation for any Hadoop
+ * {@link org.apache.hadoop.fs.FileSystem}.
  */
 package org.apache.beam.sdk.io.hdfs;