You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 19:53:27 UTC

[02/50] [abbrv] beam git commit: [BEAM-2135] Move hdfs to hadoop-file-system

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
deleted file mode 100644
index aee73c4..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.Random;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based
- * output
- * format.
- *
- * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop
- * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported
- * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the
- * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K
- * and V.
- *
- * <p>{@code HDFSFileSink} can be used by {@link Write} to create write
- * transform. See example below.
- *
- * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, NullWritable> sink =
- *   HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class));
- * avroRecordsPCollection.apply(Write.to(sink));
- * }
- * </pre>
- *
- * @param <T> the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be written to the sink via {@link FileOutputFormat}.
- * @param <V> the type of values to be written to the sink via {@link FileOutputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
-
-  private static final JobID jobId = new JobID(
-      Long.toString(System.currentTimeMillis()),
-      new Random().nextInt(Integer.MAX_VALUE));
-
-  public abstract String path();
-  public abstract Class<? extends FileOutputFormat<K, V>> formatClass();
-  public abstract Class<K> keyClass();
-  public abstract Class<V> valueClass();
-  public abstract SerializableFunction<T, KV<K, V>> outputConverter();
-  public abstract SerializableConfiguration serializableConfiguration();
-  public @Nullable abstract String username();
-  public abstract boolean validate();
-
-  // =======================================================================
-  // Factory methods
-  // =======================================================================
-
-  public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, V>
-  to(String path,
-     Class<W> formatClass,
-     Class<K> keyClass,
-     Class<V> valueClass,
-     SerializableFunction<T, KV<K, V>> outputConverter) {
-    return HDFSFileSink.<T, K, V>builder()
-        .setPath(path)
-        .setFormatClass(formatClass)
-        .setKeyClass(keyClass)
-        .setValueClass(valueClass)
-        .setOutputConverter(outputConverter)
-        .setConfiguration(null)
-        .setUsername(null)
-        .setValidate(true)
-        .build();
-  }
-
-  public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) {
-    SerializableFunction<T, KV<NullWritable, Text>> outputConverter =
-        new SerializableFunction<T, KV<NullWritable, Text>>() {
-          @Override
-          public KV<NullWritable, Text> apply(T input) {
-            return KV.of(NullWritable.get(), new Text(input.toString()));
-          }
-        };
-    return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter);
-  }
-
-  /**
-   * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration
-   * object is altered to enable Avro output.
-   */
-  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
-                                                                     final AvroCoder<T> coder,
-                                                                     Configuration conf) {
-    SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter =
-        new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() {
-          @Override
-          public KV<AvroKey<T>, NullWritable> apply(T input) {
-            return KV.of(new AvroKey<>(input), NullWritable.get());
-          }
-        };
-    conf.set("avro.schema.output.key", coder.getSchema().toString());
-    return to(
-        path,
-        AvroKeyOutputFormat.class,
-        (Class<AvroKey<T>>) (Class<?>) AvroKey.class,
-        NullWritable.class,
-        outputConverter).withConfiguration(conf);
-  }
-
-  /**
-   * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration
-   * object is altered to enable Avro output.
-   */
-  public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, NullWritable>
-  toAvro(String path, Schema schema, Configuration conf) {
-    return toAvro(path, AvroCoder.of(schema), conf);
-  }
-
-  /**
-   * Helper to create Avro sink given {@link Class}. Keep in mind that configuration
-   * object is altered to enable Avro output.
-   */
-  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
-                                                                     Class<T> cls,
-                                                                     Configuration conf) {
-    return toAvro(path, AvroCoder.of(cls), conf);
-  }
-
-  // =======================================================================
-  // Builder methods
-  // =======================================================================
-
-  public abstract Builder<T, K, V> toBuilder();
-  public static <T, K, V> Builder builder() {
-    return new AutoValue_HDFSFileSink.Builder<>();
-  }
-
-  /**
-   * AutoValue builder for {@link HDFSFileSink}.
-   */
-  @AutoValue.Builder
-  public abstract static class Builder<T, K, V> {
-    public abstract Builder<T, K, V> setPath(String path);
-    public abstract Builder<T, K, V> setFormatClass(
-        Class<? extends FileOutputFormat<K, V>> formatClass);
-    public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass);
-    public abstract Builder<T, K, V> setValueClass(Class<V> valueClass);
-    public abstract Builder<T, K, V> setOutputConverter(
-        SerializableFunction<T, KV<K, V>> outputConverter);
-    public abstract Builder<T, K, V> setSerializableConfiguration(
-        SerializableConfiguration serializableConfiguration);
-    public Builder<T, K, V> setConfiguration(@Nullable Configuration configuration) {
-      if (configuration == null) {
-        configuration = new Configuration(false);
-      }
-      return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
-    }
-    public abstract Builder<T, K, V> setUsername(String username);
-    public abstract Builder<T, K, V> setValidate(boolean validate);
-    public abstract HDFSFileSink<T, K, V> build();
-  }
-
-  public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration configuration) {
-    return this.toBuilder().setConfiguration(configuration).build();
-  }
-
-  public HDFSFileSink<T, K, V> withUsername(@Nullable String username) {
-    return this.toBuilder().setUsername(username).build();
-  }
-
-  // =======================================================================
-  // Sink
-  // =======================================================================
-
-  @Override
-  public void validate(PipelineOptions options) {
-    if (validate()) {
-      try {
-        UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
-          @Override
-          public Void run() throws Exception {
-            FileSystem fs = FileSystem.get(new URI(path()),
-                SerializableConfiguration.newConfiguration(serializableConfiguration()));
-            checkState(!fs.exists(new Path(path())), "Output path %s already exists", path());
-            return null;
-          }
-        });
-      } catch (IOException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  @Override
-  public Sink.WriteOperation<T, String> createWriteOperation() {
-    return new HDFSWriteOperation<>(this, path(), formatClass());
-  }
-
-  private Job newJob() throws IOException {
-    Job job = SerializableConfiguration.newJob(serializableConfiguration());
-    job.setJobID(jobId);
-    job.setOutputKeyClass(keyClass());
-    job.setOutputValueClass(valueClass());
-    return job;
-  }
-
-  // =======================================================================
-  // WriteOperation
-  // =======================================================================
-
-  /** {{@link WriteOperation}} for HDFS. */
-  private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, String> {
-
-    private final HDFSFileSink<T, K, V> sink;
-    private final String path;
-    private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
-    HDFSWriteOperation(HDFSFileSink<T, K, V> sink,
-                       String path,
-                       Class<? extends FileOutputFormat<K, V>> formatClass) {
-      this.sink = sink;
-      this.path = path;
-      this.formatClass = formatClass;
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) throws Exception {
-      Job job = sink.newJob();
-      FileOutputFormat.setOutputPath(job, new Path(path));
-    }
-
-    @Override
-    public void setWindowedWrites(boolean windowedWrites) {
-    }
-
-    @Override
-    public void finalize(final Iterable<String> writerResults, PipelineOptions options)
-        throws Exception {
-      UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          doFinalize(writerResults);
-          return null;
-        }
-      });
-    }
-
-    private void doFinalize(Iterable<String> writerResults) throws Exception {
-      Job job = sink.newJob();
-      FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
-      // If there are 0 output shards, just create output folder.
-      if (!writerResults.iterator().hasNext()) {
-        fs.mkdirs(new Path(path));
-        return;
-      }
-
-      // job successful
-      JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
-      FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context);
-      outputCommitter.commitJob(context);
-
-      // get actual output shards
-      Set<String> actual = Sets.newHashSet();
-      FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-          String name = path.getName();
-          return !name.startsWith("_") && !name.startsWith(".");
-        }
-      });
-
-      // get expected output shards
-      Set<String> expected = Sets.newHashSet(writerResults);
-      checkState(
-          expected.size() == Lists.newArrayList(writerResults).size(),
-          "Data loss due to writer results hash collision");
-      for (FileStatus s : statuses) {
-        String name = s.getPath().getName();
-        int pos = name.indexOf('.');
-        actual.add(pos > 0 ? name.substring(0, pos) : name);
-      }
-
-      checkState(actual.equals(expected), "Writer results and output files do not match");
-
-      // rename output shards to Hadoop style, i.e. part-r-00000.txt
-      int i = 0;
-      for (FileStatus s : statuses) {
-        String name = s.getPath().getName();
-        int pos = name.indexOf('.');
-        String ext = pos > 0 ? name.substring(pos) : "";
-        fs.rename(
-            s.getPath(),
-            new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext)));
-        i++;
-      }
-    }
-
-    @Override
-    public Writer<T, String> createWriter(PipelineOptions options) throws Exception {
-      return new HDFSWriter<>(this, path, formatClass);
-    }
-
-    @Override
-    public Sink<T> getSink() {
-      return sink;
-    }
-
-    @Override
-    public Coder<String> getWriterResultCoder() {
-      return StringUtf8Coder.of();
-    }
-
-  }
-
-  // =======================================================================
-  // Writer
-  // =======================================================================
-
-  private static class HDFSWriter<T, K, V> extends Writer<T, String> {
-
-    private final HDFSWriteOperation<T, K, V> writeOperation;
-    private final String path;
-    private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
-    // unique hash for each task
-    private int hash;
-
-    private TaskAttemptContext context;
-    private RecordWriter<K, V> recordWriter;
-    private FileOutputCommitter outputCommitter;
-
-    HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation,
-               String path,
-               Class<? extends FileOutputFormat<K, V>> formatClass) {
-      this.writeOperation = writeOperation;
-      this.path = path;
-      this.formatClass = formatClass;
-    }
-
-    @Override
-    public void openWindowed(final String uId,
-                             BoundedWindow window,
-                             PaneInfo paneInfo,
-                             int shard,
-                             int numShards) throws Exception {
-      throw new UnsupportedOperationException("Windowing support not implemented yet for"
-          + "HDFS. Window " + window);
-    }
-
-    @Override
-    public void openUnwindowed(final String uId, int shard, int numShards) throws Exception {
-      UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
-          new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-              doOpen(uId);
-              return null;
-            }
-          }
-      );
-    }
-
-    private void doOpen(String uId) throws Exception {
-      this.hash = uId.hashCode();
-
-      Job job = writeOperation.sink.newJob();
-      FileOutputFormat.setOutputPath(job, new Path(path));
-
-      // Each Writer is responsible for writing one bundle of elements and is represented by one
-      // unique Hadoop task based on uId/hash. All tasks share the same job ID.
-      JobID jobId = job.getJobID();
-      TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
-      context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
-
-      FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
-      recordWriter = outputFormat.getRecordWriter(context);
-      outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
-    }
-
-    @Override
-    public void write(T value) throws Exception {
-      checkNotNull(recordWriter,
-          "Record writer can't be null. Make sure to open Writer first!");
-      KV<K, V> kv = writeOperation.sink.outputConverter().apply(value);
-      recordWriter.write(kv.getKey(), kv.getValue());
-    }
-
-    @Override
-    public void cleanup() throws Exception {
-
-    }
-
-    @Override
-    public String close() throws Exception {
-      return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
-          new PrivilegedExceptionAction<String>() {
-            @Override
-            public String run() throws Exception {
-              return doClose();
-            }
-          });
-    }
-
-    private String doClose() throws Exception {
-      // task/attempt successful
-      recordWriter.close(context);
-      outputCommitter.commitTask(context);
-
-      // result is prefix of the output file name
-      return String.format("part-r-%d", hash);
-    }
-
-    @Override
-    public WriteOperation<T, String> getWriteOperation() {
-      return writeOperation;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
deleted file mode 100644
index 5cc2097..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.io.hadoop.WritableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a
- * Hadoop file-based input format.
- *
- * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
- * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
- * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to
- * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
- * key class and the value class.
- *
- * <p>A {@code HDFSFileSource} can be read from using the
- * {@link org.apache.beam.sdk.io.Read} transform. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
- *   MyKey.class, MyValue.class);
- * PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
- * }
- * </pre>
- *
- * <p>Implementation note: Since Hadoop's
- * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
- * determines the input splits, this class extends {@link BoundedSource} rather than
- * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
- * dictates input splits.
- * @param <T> the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be read from the source via {@link FileInputFormat}.
- * @param <V> the type of values to be read from the source via {@link FileInputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
-  private static final long serialVersionUID = 0L;
-
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class);
-
-  public abstract String filepattern();
-  public abstract Class<? extends FileInputFormat<K, V>> formatClass();
-  public abstract Coder<T> coder();
-  public abstract SerializableFunction<KV<K, V>, T> inputConverter();
-  public abstract SerializableConfiguration serializableConfiguration();
-  public @Nullable abstract SerializableSplit serializableSplit();
-  public @Nullable abstract String username();
-  public abstract boolean validateSource();
-
-  // =======================================================================
-  // Factory methods
-  // =======================================================================
-
-  public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, K, V>
-  from(String filepattern,
-       Class<W> formatClass,
-       Coder<T> coder,
-       SerializableFunction<KV<K, V>, T> inputConverter) {
-    return HDFSFileSource.<T, K, V>builder()
-        .setFilepattern(filepattern)
-        .setFormatClass(formatClass)
-        .setCoder(coder)
-        .setInputConverter(inputConverter)
-        .setConfiguration(null)
-        .setUsername(null)
-        .setValidateSource(true)
-        .setSerializableSplit(null)
-        .build();
-  }
-
-  public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, V>, K, V>
-  from(String filepattern,
-       Class<W> formatClass,
-       Class<K> keyClass,
-       Class<V> valueClass) {
-    KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
-    SerializableFunction<KV<K, V>, KV<K, V>> inputConverter =
-        new SerializableFunction<KV<K, V>, KV<K, V>>() {
-          @Override
-          public KV<K, V> apply(KV<K, V> input) {
-            return input;
-          }
-        };
-    return HDFSFileSource.<KV<K, V>, K, V>builder()
-        .setFilepattern(filepattern)
-        .setFormatClass(formatClass)
-        .setCoder(coder)
-        .setInputConverter(inputConverter)
-        .setConfiguration(null)
-        .setUsername(null)
-        .setValidateSource(true)
-        .setSerializableSplit(null)
-        .build();
-  }
-
-  public static HDFSFileSource<String, LongWritable, Text>
-  fromText(String filepattern) {
-    SerializableFunction<KV<LongWritable, Text>, String> inputConverter =
-        new SerializableFunction<KV<LongWritable, Text>, String>() {
-      @Override
-      public String apply(KV<LongWritable, Text> input) {
-        return input.getValue().toString();
-      }
-    };
-    return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter);
-  }
-
-  /**
-   * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration
-   * object is altered to enable Avro input.
-   */
-  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
-  fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) {
-    Class<AvroKeyInputFormat<T>> formatClass = castClass(AvroKeyInputFormat.class);
-    SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter =
-        new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() {
-          @Override
-          public T apply(KV<AvroKey<T>, NullWritable> input) {
-            try {
-              return CoderUtils.clone(coder, input.getKey().datum());
-            } catch (CoderException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        };
-    conf.set("avro.schema.input.key", coder.getSchema().toString());
-    return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf);
-  }
-
-  /**
-   * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration
-   * object is altered to enable Avro input.
-   */
-  public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, NullWritable>
-  fromAvro(String filepattern, Schema schema, Configuration conf) {
-    return fromAvro(filepattern, AvroCoder.of(schema), conf);
-  }
-
-  /**
-   * Helper to read from Avro source given {@link Class}. Keep in mind that configuration
-   * object is altered to enable Avro input.
-   */
-  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
-  fromAvro(String filepattern, Class<T> cls, Configuration conf) {
-    return fromAvro(filepattern, AvroCoder.of(cls), conf);
-  }
-
-  // =======================================================================
-  // Builder methods
-  // =======================================================================
-
-  public abstract HDFSFileSource.Builder<T, K, V> toBuilder();
-  public static <T, K, V> HDFSFileSource.Builder builder() {
-    return new AutoValue_HDFSFileSource.Builder<>();
-  }
-
-  /**
-   * AutoValue builder for {@link HDFSFileSource}.
-   */
-  @AutoValue.Builder
-  public abstract static class Builder<T, K, V> {
-    public abstract Builder<T, K, V> setFilepattern(String filepattern);
-    public abstract Builder<T, K, V> setFormatClass(
-        Class<? extends FileInputFormat<K, V>> formatClass);
-    public abstract Builder<T, K, V> setCoder(Coder<T> coder);
-    public abstract Builder<T, K, V> setInputConverter(
-        SerializableFunction<KV<K, V>, T> inputConverter);
-    public abstract Builder<T, K, V> setSerializableConfiguration(
-        SerializableConfiguration serializableConfiguration);
-    public Builder<T, K, V> setConfiguration(Configuration configuration) {
-      if (configuration == null) {
-        configuration = new Configuration(false);
-      }
-      return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
-    }
-    public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit serializableSplit);
-    public abstract Builder<T, K, V> setUsername(@Nullable String username);
-    public abstract Builder<T, K, V> setValidateSource(boolean validate);
-    public abstract HDFSFileSource<T, K, V> build();
-  }
-
-  public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration configuration) {
-    return this.toBuilder().setConfiguration(configuration).build();
-  }
-
-  public HDFSFileSource<T, K, V> withUsername(@Nullable String username) {
-    return this.toBuilder().setUsername(username).build();
-  }
-
-  // =======================================================================
-  // BoundedSource
-  // =======================================================================
-
-  @Override
-  public List<? extends BoundedSource<T>> split(
-      final long desiredBundleSizeBytes,
-      PipelineOptions options) throws Exception {
-    if (serializableSplit() == null) {
-      List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs(
-          new PrivilegedExceptionAction<List<InputSplit>>() {
-            @Override
-            public List<InputSplit> run() throws Exception {
-              return computeSplits(desiredBundleSizeBytes, serializableConfiguration());
-            }
-          });
-      return Lists.transform(inputSplits,
-          new Function<InputSplit, BoundedSource<T>>() {
-            @Override
-            public BoundedSource<T> apply(@Nullable InputSplit inputSplit) {
-              SerializableSplit serializableSplit = new SerializableSplit(inputSplit);
-              return HDFSFileSource.this.toBuilder()
-                  .setSerializableSplit(serializableSplit)
-                  .build();
-            }
-          });
-    } else {
-      return ImmutableList.of(this);
-    }
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) {
-    long size = 0;
-
-    try {
-      // If this source represents a split from split,
-      // then return the size of the split, rather then the entire input
-      if (serializableSplit() != null) {
-        return serializableSplit().getSplit().getLength();
-      }
-
-      size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Long>() {
-        @Override
-        public Long run() throws Exception {
-          long size = 0;
-          Job job = SerializableConfiguration.newJob(serializableConfiguration());
-          for (FileStatus st : listStatus(createFormat(job), job)) {
-            size += st.getLen();
-          }
-          return size;
-        }
-      });
-    } catch (IOException e) {
-      LOG.warn(
-          "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
-      // ignore, and return 0
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.warn(
-          "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
-      // ignore, and return 0
-    }
-    return size;
-  }
-
-  @Override
-  public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-    this.validate();
-    return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit());
-  }
-
-  @Override
-  public void validate() {
-    if (validateSource()) {
-      try {
-        UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
-                final Path pathPattern = new Path(filepattern());
-                FileSystem fs = FileSystem.get(pathPattern.toUri(),
-                    SerializableConfiguration.newConfiguration(serializableConfiguration()));
-                FileStatus[] fileStatuses = fs.globStatus(pathPattern);
-                checkState(
-                    fileStatuses != null && fileStatuses.length > 0,
-                    "Unable to find any files matching %s", filepattern());
-                  return null;
-                }
-              });
-      } catch (IOException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return coder();
-  }
-
-  // =======================================================================
-  // Helpers
-  // =======================================================================
-
-  private List<InputSplit> computeSplits(long desiredBundleSizeBytes,
-                                         SerializableConfiguration serializableConfiguration)
-      throws IOException, IllegalAccessException, InstantiationException {
-    Job job = SerializableConfiguration.newJob(serializableConfiguration);
-    FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
-    FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
-    return createFormat(job).getSplits(job);
-  }
-
-  private FileInputFormat<K, V> createFormat(Job job)
-      throws IOException, IllegalAccessException, InstantiationException {
-    Path path = new Path(filepattern());
-    FileInputFormat.addInputPath(job, path);
-    return formatClass().newInstance();
-  }
-
-  private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job)
-      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-    // FileInputFormat#listStatus is protected, so call using reflection
-    Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
-    listStatus.setAccessible(true);
-    @SuppressWarnings("unchecked")
-    List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job);
-    return stat;
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> Coder<T> getDefaultCoder(Class<T> c) {
-    if (Writable.class.isAssignableFrom(c)) {
-      Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
-      return (Coder<T>) WritableCoder.of(writableClass);
-    } else if (Void.class.equals(c)) {
-      return (Coder<T>) VoidCoder.of();
-    }
-    // TODO: how to use registered coders here?
-    throw new IllegalStateException("Cannot find coder for " + c);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> Class<T> castClass(Class<?> aClass) {
-    return (Class<T>) aClass;
-  }
-
-  // =======================================================================
-  // BoundedReader
-  // =======================================================================
-
-  private static class HDFSFileReader<T, K, V> extends BoundedSource.BoundedReader<T> {
-
-    private final HDFSFileSource<T, K, V> source;
-    private final String filepattern;
-    private final Class<? extends FileInputFormat<K, V>> formatClass;
-    private final Job job;
-
-    private List<InputSplit> splits;
-    private ListIterator<InputSplit> splitsIterator;
-
-    private Configuration conf;
-    private FileInputFormat<?, ?> format;
-    private TaskAttemptContext attemptContext;
-    private RecordReader<K, V> currentReader;
-    private KV<K, V> currentPair;
-
-    HDFSFileReader(
-        HDFSFileSource<T, K, V> source,
-        String filepattern,
-        Class<? extends FileInputFormat<K, V>> formatClass,
-        SerializableSplit serializableSplit)
-        throws IOException {
-      this.source = source;
-      this.filepattern = filepattern;
-      this.formatClass = formatClass;
-      this.job = SerializableConfiguration.newJob(source.serializableConfiguration());
-
-      if (serializableSplit != null) {
-        this.splits = ImmutableList.of(serializableSplit.getSplit());
-        this.splitsIterator = splits.listIterator();
-      }
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      Path path = new Path(filepattern);
-      FileInputFormat.addInputPath(job, path);
-
-      conf = job.getConfiguration();
-      try {
-        format = formatClass.newInstance();
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException("Cannot instantiate file input format " + formatClass, e);
-      }
-      attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID());
-
-      if (splitsIterator == null) {
-        splits = format.getSplits(job);
-        splitsIterator = splits.listIterator();
-      }
-
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      try {
-        if (currentReader != null && currentReader.nextKeyValue()) {
-          currentPair = nextPair();
-          return true;
-        } else {
-          while (splitsIterator.hasNext()) {
-            // advance the reader and see if it has records
-            final InputSplit nextSplit = splitsIterator.next();
-            @SuppressWarnings("unchecked")
-            RecordReader<K, V> reader =
-                (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
-            if (currentReader != null) {
-              currentReader.close();
-            }
-            currentReader = reader;
-            UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
-                currentReader.initialize(nextSplit, attemptContext);
-                return null;
-              }
-            });
-            if (currentReader.nextKeyValue()) {
-              currentPair = nextPair();
-              return true;
-            }
-            currentReader.close();
-            currentReader = null;
-          }
-          // either no next split or all readers were empty
-          currentPair = null;
-          return false;
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      if (currentPair == null) {
-        throw new NoSuchElementException();
-      }
-      return source.inputConverter().apply(currentPair);
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (currentReader != null) {
-        currentReader.close();
-        currentReader = null;
-      }
-      currentPair = null;
-    }
-
-    @Override
-    public BoundedSource<T> getCurrentSource() {
-      return source;
-    }
-
-    @SuppressWarnings("unchecked")
-    private KV<K, V> nextPair() throws IOException, InterruptedException {
-      K key = currentReader.getCurrentKey();
-      V value = currentReader.getCurrentValue();
-      // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
-      if (key instanceof Writable) {
-        key = (K) WritableUtils.clone((Writable) key, conf);
-      }
-      if (value instanceof Writable) {
-        value = (V) WritableUtils.clone((Writable) value, conf);
-      }
-      return KV.of(key, value);
-    }
-
-    // =======================================================================
-    // Optional overrides
-    // =======================================================================
-
-    @Override
-    public Double getFractionConsumed() {
-      if (currentReader == null) {
-        return 0.0;
-      }
-      if (splits.isEmpty()) {
-        return 1.0;
-      }
-      int index = splitsIterator.previousIndex();
-      int numReaders = splits.size();
-      if (index == numReaders) {
-        return 1.0;
-      }
-      double before = 1.0 * index / numReaders;
-      double after = 1.0 * (index + 1) / numReaders;
-      Double fractionOfCurrentReader = getProgress();
-      if (fractionOfCurrentReader == null) {
-        return before;
-      }
-      return before + fractionOfCurrentReader * (after - before);
-    }
-
-    private Double getProgress() {
-      try {
-        return (double) currentReader.getProgress();
-      } catch (IOException | InterruptedException e) {
-        return null;
-      }
-    }
-
-  }
-
-  // =======================================================================
-  // SerializableSplit
-  // =======================================================================
-
-  /**
-   * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
-   * serialized using Java's standard serialization mechanisms. Note that the InputSplit
-   * has to be Writable (which most are).
-   */
-  protected static class SerializableSplit implements Externalizable {
-    private static final long serialVersionUID = 0L;
-
-    private InputSplit split;
-
-    public SerializableSplit() {
-    }
-
-    public SerializableSplit(InputSplit split) {
-      checkArgument(split instanceof Writable, "Split is not writable: %s", split);
-      this.split = split;
-    }
-
-    public InputSplit getSplit() {
-      return split;
-    }
-
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-      out.writeUTF(split.getClass().getCanonicalName());
-      ((Writable) split).write(out);
-    }
-
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-      String className = in.readUTF();
-      try {
-        split = (InputSplit) Class.forName(className).newInstance();
-        ((Writable) split).readFields(in);
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException(e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
deleted file mode 100644
index 154a818..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.fs.CreateOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as
- * Apache Beam {@link FileSystem FileSystems}.
- *
- * <p>The following HDFS FileSystem(s) are known to be unsupported:
- * <ul>
- *   <li>FTPFileSystem: Missing seek support within FTPInputStream</li>
- * </ul>
- *
- * <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is seek
- * efficient when reading. The source code for the following {@link FSInputStream} implementations
- * (as of Hadoop 2.7.1) do provide seek implementations:
- * <ul>
- *   <li>HarFsInputStream</li>
- *   <li>S3InputStream</li>
- *   <li>DFSInputStream</li>
- *   <li>SwiftNativeInputStream</li>
- *   <li>NativeS3FsInputStream</li>
- *   <li>LocalFSFileInputStream</li>
- *   <li>NativeAzureFsInputStream</li>
- *   <li>S3AInputStream</li>
- * </ul>
- */
-class HadoopFileSystem extends FileSystem<HadoopResourceId> {
-  @VisibleForTesting
-  final org.apache.hadoop.fs.FileSystem fileSystem;
-
-  HadoopFileSystem(Configuration configuration) throws IOException {
-    this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration);
-  }
-
-  @Override
-  protected List<MatchResult> match(List<String> specs) {
-    ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
-    for (String spec : specs) {
-      try {
-        FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec));
-        List<Metadata> metadata = new ArrayList<>();
-        for (FileStatus fileStatus : fileStatuses) {
-          if (fileStatus.isFile()) {
-            metadata.add(Metadata.builder()
-                .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri()))
-                .setIsReadSeekEfficient(true)
-                .setSizeBytes(fileStatus.getLen())
-                .build());
-          }
-        }
-        resultsBuilder.add(MatchResult.create(Status.OK, metadata));
-      } catch (IOException e) {
-        resultsBuilder.add(MatchResult.create(Status.ERROR, e));
-      }
-    }
-    return resultsBuilder.build();
-  }
-
-  @Override
-  protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)
-      throws IOException {
-    return Channels.newChannel(fileSystem.create(resourceId.toPath()));
-  }
-
-  @Override
-  protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException {
-    FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath());
-    return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath()));
-  }
-
-  @Override
-  protected void copy(
-      List<HadoopResourceId> srcResourceIds,
-      List<HadoopResourceId> destResourceIds) throws IOException {
-    for (int i = 0; i < srcResourceIds.size(); ++i) {
-      // Unfortunately HDFS FileSystems don't support a native copy operation so we are forced
-      // to use the inefficient implementation found in FileUtil which copies all the bytes through
-      // the local machine.
-      //
-      // HDFS FileSystem does define a concat method but could only find the DFSFileSystem
-      // implementing it. The DFSFileSystem implemented concat by deleting the srcs after which
-      // is not what we want. Also, all the other FileSystem implementations I saw threw
-      // UnsupportedOperationException within concat.
-      FileUtil.copy(
-          fileSystem, srcResourceIds.get(i).toPath(),
-          fileSystem, destResourceIds.get(i).toPath(),
-          false,
-          true,
-          fileSystem.getConf());
-    }
-  }
-
-  @Override
-  protected void rename(
-      List<HadoopResourceId> srcResourceIds,
-      List<HadoopResourceId> destResourceIds) throws IOException {
-    for (int i = 0; i < srcResourceIds.size(); ++i) {
-      fileSystem.rename(
-          srcResourceIds.get(i).toPath(),
-          destResourceIds.get(i).toPath());
-    }
-  }
-
-  @Override
-  protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
-    for (HadoopResourceId resourceId : resourceIds) {
-      fileSystem.delete(resourceId.toPath(), false);
-    }
-  }
-
-  @Override
-  protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
-    try {
-      if (singleResourceSpec.endsWith("/") && !isDirectory) {
-        throw new IllegalArgumentException(String.format(
-            "Expected file path but received directory path %s", singleResourceSpec));
-      }
-      return !singleResourceSpec.endsWith("/") && isDirectory
-          ? new HadoopResourceId(new URI(singleResourceSpec + "/"))
-          : new HadoopResourceId(new URI(singleResourceSpec));
-    } catch (URISyntaxException e) {
-      throw new IllegalArgumentException(
-          String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory),
-          e);
-    }
-  }
-
-  @Override
-  protected String getScheme() {
-    return fileSystem.getScheme();
-  }
-
-  /** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}. */
-  private static class HadoopSeekableByteChannel implements SeekableByteChannel {
-    private final FileStatus fileStatus;
-    private final FSDataInputStream inputStream;
-    private boolean closed;
-
-    private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream) {
-      this.fileStatus = fileStatus;
-      this.inputStream = inputStream;
-      this.closed = false;
-    }
-
-    @Override
-    public int read(ByteBuffer dst) throws IOException {
-      if (closed) {
-        throw new IOException("Channel is closed");
-      }
-      return inputStream.read(dst);
-    }
-
-    @Override
-    public int write(ByteBuffer src) throws IOException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long position() throws IOException {
-      if (closed) {
-        throw new IOException("Channel is closed");
-      }
-      return inputStream.getPos();
-    }
-
-    @Override
-    public SeekableByteChannel position(long newPosition) throws IOException {
-      if (closed) {
-        throw new IOException("Channel is closed");
-      }
-      inputStream.seek(newPosition);
-      return this;
-    }
-
-    @Override
-    public long size() throws IOException {
-      if (closed) {
-        throw new IOException("Channel is closed");
-      }
-      return fileStatus.getLen();
-    }
-
-    @Override
-    public SeekableByteChannel truncate(long size) throws IOException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isOpen() {
-      return !closed;
-    }
-
-    @Override
-    public void close() throws IOException {
-      closed = true;
-      inputStream.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
deleted file mode 100644
index 2cb9d8a..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer}
- * for a Hadoop {@link Configuration}. The serialized representation is that of a JSON map.
- *
- * <p>Note that the serialization of the Hadoop {@link Configuration} only keeps the keys and their
- * values dropping any configuration hierarchy and source information.
- */
-@AutoService(Module.class)
-public class HadoopFileSystemModule extends SimpleModule {
-  public HadoopFileSystemModule() {
-    super("HadoopFileSystemModule");
-    setMixInAnnotation(Configuration.class, ConfigurationMixin.class);
-  }
-
-  /** A mixin class to add Jackson annotations to the Hadoop {@link Configuration} class. */
-  @JsonDeserialize(using = ConfigurationDeserializer.class)
-  @JsonSerialize(using = ConfigurationSerializer.class)
-  private static class ConfigurationMixin {}
-
-  /** A Jackson {@link JsonDeserializer} for Hadoop {@link Configuration} objects. */
-  static class ConfigurationDeserializer extends JsonDeserializer<Configuration> {
-    @Override
-    public Configuration deserialize(JsonParser jsonParser,
-        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-      Map<String, String> rawConfiguration =
-          jsonParser.readValueAs(new TypeReference<Map<String, String>>() {});
-      Configuration configuration = new Configuration(false);
-      for (Map.Entry<String, String> entry : rawConfiguration.entrySet()) {
-        configuration.set(entry.getKey(), entry.getValue());
-      }
-      return configuration;
-    }
-  }
-
-  /** A Jackson {@link JsonSerializer} for Hadoop {@link Configuration} objects. */
-  static class ConfigurationSerializer extends JsonSerializer<Configuration> {
-    @Override
-    public void serialize(Configuration configuration, JsonGenerator jsonGenerator,
-        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
-      Map<String, String> map = new TreeMap<>();
-      for (Map.Entry<String, String> entry : configuration) {
-        map.put(entry.getKey(), entry.getValue());
-      }
-      jsonGenerator.writeObject(map);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
deleted file mode 100644
index 31250bc..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.util.List;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
- * for the {@link HadoopFileSystem}.
- */
-public interface HadoopFileSystemOptions extends PipelineOptions {
-  @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
-      + "To specify on the command-line, represent the value as a JSON list of JSON maps, where "
-      + "each map represents the entire configuration for a single Hadoop filesystem. For example "
-      + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...},"
-      + "{\"fs.default.name\": \"s3a://\", ...},...]'")
-  @Default.InstanceFactory(ConfigurationLocator.class)
-  List<Configuration> getHdfsConfiguration();
-  void setHdfsConfiguration(List<Configuration> value);
-
-  /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */
-  class ConfigurationLocator implements DefaultValueFactory<Configuration> {
-    @Override
-    public Configuration create(PipelineOptions options) {
-      // TODO: Find default configuration to use
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
deleted file mode 100644
index 344623b..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-
-/**
- * {@link AutoService} registrar for {@link HadoopFileSystemOptions}.
- */
-@AutoService(PipelineOptionsRegistrar.class)
-public class HadoopFileSystemOptionsRegistrar implements PipelineOptionsRegistrar {
-
-  @Override
-  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-    return ImmutableList.<Class<? extends PipelineOptions>>of(HadoopFileSystemOptions.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
deleted file mode 100644
index 9159df3..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.Nonnull;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * {@link AutoService} registrar for the {@link HadoopFileSystem}.
- */
-@AutoService(FileSystemRegistrar.class)
-public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
-
-  @Override
-  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
-    List<Configuration> configurations =
-        options.as(HadoopFileSystemOptions.class).getHdfsConfiguration();
-    if (configurations == null) {
-      configurations = Collections.emptyList();
-    }
-    checkArgument(configurations.size() <= 1,
-        String.format(
-            "The %s currently only supports at most a single Hadoop configuration.",
-            HadoopFileSystemRegistrar.class.getSimpleName()));
-
-    ImmutableList.Builder<FileSystem> builder = ImmutableList.builder();
-    for (Configuration configuration : configurations) {
-      try {
-        builder.add(new HadoopFileSystem(configuration));
-      } catch (IOException e) {
-        throw new IllegalArgumentException(String.format(
-            "Failed to construct Hadoop filesystem with configuration %s", configuration), e);
-      }
-    }
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
deleted file mode 100644
index e570864..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.net.URI;
-import java.util.Objects;
-import org.apache.beam.sdk.io.fs.ResolveOptions;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.hadoop.fs.Path;
-
-/**
- * {@link ResourceId} implementation for the {@link HadoopFileSystem}.
- */
-class HadoopResourceId implements ResourceId {
-  private final URI uri;
-
-  HadoopResourceId(URI uri) {
-    this.uri = uri;
-  }
-
-  @Override
-  public ResourceId resolve(String other, ResolveOptions resolveOptions) {
-    return new HadoopResourceId(uri.resolve(other));
-  }
-
-  @Override
-  public ResourceId getCurrentDirectory() {
-    return new HadoopResourceId(uri.getPath().endsWith("/") ? uri : uri.resolve("."));
-  }
-
-  public boolean isDirectory() {
-    return uri.getPath().endsWith("/");
-  }
-
-  @Override
-  public String getFilename() {
-    return new Path(uri).getName();
-  }
-
-  @Override
-  public String getScheme() {
-    return uri.getScheme();
-  }
-
-  @Override
-  public String toString() {
-    return uri.toString();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (!(obj instanceof HadoopResourceId)) {
-      return false;
-    }
-    return Objects.equals(uri, ((HadoopResourceId) obj).uri);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(uri);
-  }
-
-  Path toPath() {
-    return new Path(uri);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
deleted file mode 100644
index fe2db5f..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-
-/**
- * This class is deprecated, and only exists for HDFSFileSink.
- */
-@Deprecated
-public abstract class Sink<T> implements Serializable, HasDisplayData {
-  /**
-   * Ensures that the sink is valid and can be written to before the write operation begins. One
-   * should use {@link com.google.common.base.Preconditions} to implement this method.
-   */
-  public abstract void validate(PipelineOptions options);
-
-  /**
-   * Returns an instance of a {@link WriteOperation} that can write to this Sink.
-   */
-  public abstract WriteOperation<T, ?> createWriteOperation();
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may override this method
-   * to provide their own display data.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {}
-
-  /**
-   * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
-   *
-   * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
-   * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
-   * a bundle to the sink.
-   *
-   * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance,
-   * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
-   *
-   * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
-   * call to {@code initialize} method and deserialized before calls to
-   * {@code createWriter} and {@code finalized}. However, it is not
-   * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
-   * state of the {@code WriteOperation}.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of objects to write
-   * @param <WriteT> The result of a per-bundle write
-   */
-  public abstract static class WriteOperation<T, WriteT> implements Serializable {
-    /**
-     * Performs initialization before writing to the sink. Called before writing begins.
-     */
-    public abstract void initialize(PipelineOptions options) throws Exception;
-
-    /**
-     * Indicates that the operation will be performing windowed writes.
-     */
-    public abstract void setWindowedWrites(boolean windowedWrites);
-
-    /**
-     * Given an Iterable of results from bundle writes, performs finalization after writing and
-     * closes the sink. Called after all bundle writes are complete.
-     *
-     * <p>The results that are passed to finalize are those returned by bundles that completed
-     * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
-     * one writer result will be passed to finalize for each bundle. An implementation of finalize
-     * should perform clean up of any failed and successfully retried bundles.  Note that these
-     * failed bundles will not have their writer result passed to finalize, so finalize should be
-     * capable of locating any temporary/partial output written by failed bundles.
-     *
-     * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
-     * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
-     * failure/retry or for redundancy.
-     *
-     * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
-     * finalize is called multiple times.
-     *
-     * @param writerResults an Iterable of results from successful bundle writes.
-     */
-    public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
-        throws Exception;
-
-    /**
-     * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
-     *
-     * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
-     * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
-     *
-     * <p>Must not mutate the state of the WriteOperation.
-     */
-    public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
-
-    /**
-     * Returns the Sink that this write operation writes to.
-     */
-    public abstract Sink<T> getSink();
-
-    /**
-     * Returns a coder for the writer result type.
-     */
-    public abstract Coder<WriteT> getWriterResultCoder();
-  }
-
-  /**
-   * A Writer writes a bundle of elements from a PCollection to a sink.
-   * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
-   * and {@link Writer#close} is called after all elements in the bundle have been written.
-   * {@link Writer#write} writes an element to the sink.
-   *
-   * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
-   * multiple instances of a Writer may be instantiated in different threads on the same worker.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of object to write
-   * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
-   */
-  public abstract static class Writer<T, WriteT> {
-    /**
-     * Performs bundle initialization. For example, creates a temporary file for writing or
-     * initializes any state that will be used across calls to {@link Writer#write}.
-     *
-     * <p>The unique id that is given to open should be used to ensure that the writer's output does
-     * not interfere with the output of other Writers, as a bundle may be executed many times for
-     * fault tolerance. See {@link Sink} for more information about bundle ids.
-     *
-     * <p>The window and paneInfo arguments are populated when windowed writes are requested.
-     * shard and numbShards are populated for the case of static sharding. In cases where the
-     * runner is dynamically picking sharding, shard and numShards might both be set to -1.
-     */
-    public abstract void openWindowed(String uId,
-                                      BoundedWindow window,
-                                      PaneInfo paneInfo,
-                                      int shard,
-                                      int numShards) throws Exception;
-
-    /**
-     * Perform bundle initialization for the case where the file is written unwindowed.
-     */
-    public abstract void openUnwindowed(String uId,
-                                        int shard,
-                                        int numShards) throws Exception;
-
-    public abstract void cleanup() throws Exception;
-
-    /**
-     * Called for each value in the bundle.
-     */
-    public abstract void write(T value) throws Exception;
-
-    /**
-     * Finishes writing the bundle. Closes any resources used for writing the bundle.
-     *
-     * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
-     * finalization. The result should contain some way to identify the output of this bundle (using
-     * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
-     * successful writes. See {@link Sink} for more information about bundle ids.
-     *
-     * @return the writer result
-     */
-    public abstract WriteT close() throws Exception;
-
-    /**
-     * Returns the write operation this writer belongs to.
-     */
-    public abstract WriteOperation<T, WriteT> getWriteOperation();
-
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
deleted file mode 100644
index fd05a19..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * {@link UserGroupInformation} helper methods.
- */
-public class UGIHelper {
-
-  /**
-   * Find the most appropriate UserGroupInformation to use.
-   * @param username the user name, or NULL if none is specified.
-   * @return the most appropriate UserGroupInformation
-   */
-  public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException {
-    return UserGroupInformation.getBestUGI(null, username);
-  }
-
-}