You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/15 19:29:19 UTC

[1/2] incubator-beam git commit: [BEAM-77] Move hadoop contrib into hdfs IO

Repository: incubator-beam
Updated Branches:
  refs/heads/master bcefff6a3 -> c8ed39806


[BEAM-77] Move hadoop contrib into hdfs IO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/404b633d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/404b633d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/404b633d

Branch: refs/heads/master
Commit: 404b633d43d23940a7f11a93a472980f7bb09ce7
Parents: bcefff6
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Fri Apr 15 08:48:10 2016 +0200
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Apr 15 10:28:48 2016 -0700

----------------------------------------------------------------------
 contrib/hadoop/README.md                        |  24 -
 contrib/hadoop/pom.xml                          | 170 -------
 .../apache/contrib/hadoop/HadoopFileSource.java | 486 -------------------
 .../apache/contrib/hadoop/WritableCoder.java    | 111 -----
 .../contrib/hadoop/HadoopFileSourceTest.java    | 190 --------
 .../contrib/hadoop/WritableCoderTest.java       |  37 --
 sdks/java/io/hdfs/README.md                     |  24 +
 sdks/java/io/hdfs/pom.xml                       |  65 +++
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 486 +++++++++++++++++++
 .../apache/beam/sdk/io/hdfs/WritableCoder.java  | 111 +++++
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    | 190 ++++++++
 .../beam/sdk/io/hdfs/WritableCoderTest.java     |  37 ++
 sdks/java/io/pom.xml                            |  41 ++
 sdks/java/pom.xml                               |   1 +
 14 files changed, 955 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/README.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop/README.md b/contrib/hadoop/README.md
deleted file mode 100644
index 49bbf98..0000000
--- a/contrib/hadoop/README.md
+++ /dev/null
@@ -1,24 +0,0 @@
-# Hadoop module
-
-This library provides Dataflow sources and sinks to make it possible to read and
-write Apache Hadoop file formats from Dataflow pipelines.
-
-Currently, only the read path is implemented. A `HadoopFileSource` allows any
-Hadoop `FileInputFormat` to be read as a `PCollection`.
-
-A `HadoopFileSource` can be read from using the
-`com.google.cloud.dataflow.sdk.io.Read` transform. For example:
-
-```java
-HadoopFileSource<K, V> source = HadoopFileSource.from(path, MyInputFormat.class,
-  MyKey.class, MyValue.class);
-PCollection<KV<MyKey, MyValue>> records = Read.from(mySource);
-```
-
-Alternatively, the `readFrom` method is a convenience method that returns a read
-transform. For example:
-
-```java
-PCollection<KV<MyKey, MyValue>> records = HadoopFileSource.readFrom(path,
-  MyInputFormat.class, MyKey.class, MyValue.class);
-```

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/hadoop/pom.xml b/contrib/hadoop/pom.xml
deleted file mode 100644
index 24e454b..0000000
--- a/contrib/hadoop/pom.xml
+++ /dev/null
@@ -1,170 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <groupId>com.google.cloud.dataflow</groupId>
-  <artifactId>google-cloud-dataflow-java-contrib-hadoop</artifactId>
-  <name>Google Cloud Dataflow Hadoop Library</name>
-  <description>Library to read and write Hadoop file formats from Dataflow.</description>
-  <version>0.0.1-SNAPSHOT</version>
-  <packaging>jar</packaging>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-    </license>
-  </licenses>
-
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <beam-version>[0.1.0, 1.0.0)</beam-version>
-  </properties>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>3.2</version>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <version>2.12</version>
-        <dependencies>
-          <dependency>
-            <groupId>com.puppycrawl.tools</groupId>
-            <artifactId>checkstyle</artifactId>
-            <version>6.6</version>
-          </dependency>
-        </dependencies>
-        <configuration>
-          <configLocation>../../sdks/java/checkstyle.xml</configLocation>
-          <consoleOutput>true</consoleOutput>
-          <failOnViolation>true</failOnViolation>
-          <includeTestSourceDirectory>true</includeTestSourceDirectory>
-        </configuration>
-        <executions>
-          <execution>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <!-- Source plugin for generating source and test-source JARs. -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-source-plugin</artifactId>
-        <version>2.4</version>
-        <executions>
-          <execution>
-            <id>attach-sources</id>
-            <phase>compile</phase>
-            <goals>
-              <goal>jar</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>attach-test-sources</id>
-            <phase>test-compile</phase>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-javadoc-plugin</artifactId>
-        <configuration>
-          <windowtitle>Google Cloud Dataflow Hadoop Contrib</windowtitle>
-          <doctitle>Google Cloud Dataflow Hadoop Contrib</doctitle>
-
-          <subpackages>com.google.cloud.dataflow.contrib.hadoop</subpackages>
-          <use>false</use>
-          <bottom><![CDATA[<br>]]></bottom>
-
-          <offlineLinks>
-            <offlineLink>
-              <url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
-              <location>${basedir}/../../javadoc/dataflow-sdk-docs</location>
-            </offlineLink>
-            <offlineLink>
-              <url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
-              <location>${basedir}/../../javadoc/guava-docs</location>
-            </offlineLink>
-          </offlineLinks>
-        </configuration>
-        <executions>
-          <execution>
-            <goals>
-              <goal>jar</goal>
-            </goals>
-            <phase>package</phase>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>java-sdk-all</artifactId>
-      <version>${beam-version}</version>
-    </dependency>
-
-    <!-- @tomwhite: Hadoop doesn't have great RPC client compatibility between one version and
-    another so it's common to mark the Hadoop dependency as provided and have users specify the
-    version they need in their project. -->
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <version>2.7.0</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <!-- test dependencies -->
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-      <version>1.3</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>4.11</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java b/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java
deleted file mode 100644
index 65862f7..0000000
--- a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.hadoop;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.KV;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-
-/**
- * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a
- * Hadoop file-based input format.
- *
- * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
- * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
- * Hadoop files, use {@link HadoopFileSource#from} to specify the path(s) of the files to
- * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
- * key class and the value class.
- *
- * <p>A {@code HadoopFileSource} can be read from using the
- * {@link org.apache.beam.sdk.io.Read} transform. For example:
- *
- * <pre>
- * {@code
- * HadoopFileSource<K, V> source = HadoopFileSource.from(path, MyInputFormat.class,
- *   MyKey.class, MyValue.class);
- * PCollection<KV<MyKey, MyValue>> records = Read.from(mySource);
- * }
- * </pre>
- *
- * <p>The {@link HadoopFileSource#readFrom} method is a convenience method
- * that returns a read transform. For example:
- *
- * <pre>
- * {@code
- * PCollection<KV<MyKey, MyValue>> records = HadoopFileSource.readFrom(path,
- *   MyInputFormat.class, MyKey.class, MyValue.class);
- * }
- * </pre>
- *
- * Implementation note: Since Hadoop's {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
- * determines the input splits, this class extends {@link BoundedSource} rather than
- * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
- * dictates input splits.
-
- * @param <K> The type of keys to be read from the source.
- * @param <V> The type of values to be read from the source.
- */
-public class HadoopFileSource<K, V> extends BoundedSource<KV<K, V>> {
-  private static final long serialVersionUID = 0L;
-
-  private final String filepattern;
-  private final Class<? extends FileInputFormat<?, ?>> formatClass;
-  private final Class<K> keyClass;
-  private final Class<V> valueClass;
-  private final SerializableSplit serializableSplit;
-
-  /**
-   * Creates a {@code Read} transform that will read from an {@code HadoopFileSource}
-   * with the given file name or pattern ("glob") using the given Hadoop
-   * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
-   * with key-value types specified by the given key class and value class.
-   */
-  public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K, V>> readFrom(
-      String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) {
-    return Read.from(from(filepattern, formatClass, keyClass, valueClass));
-  }
-
-  /**
-   * Creates a {@code HadoopFileSource} that reads from the given file name or pattern ("glob")
-   * using the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
-   * with key-value types specified by the given key class and value class.
-   */
-  public static <K, V, T extends FileInputFormat<K, V>> HadoopFileSource<K, V> from(
-      String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) {
-    @SuppressWarnings("unchecked")
-    HadoopFileSource<K, V> source = (HadoopFileSource<K, V>)
-        new HadoopFileSource(filepattern, formatClass, keyClass, valueClass);
-    return source;
-  }
-
-  /**
-   * Create a {@code HadoopFileSource} based on a file or a file pattern specification.
-   */
-  private HadoopFileSource(String filepattern,
-      Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
-      Class<V> valueClass) {
-    this(filepattern, formatClass, keyClass, valueClass, null);
-  }
-
-  /**
-   * Create a {@code HadoopFileSource} based on a single Hadoop input split, which won't be
-   * split up further.
-   */
-  private HadoopFileSource(String filepattern,
-      Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
-      Class<V> valueClass, SerializableSplit serializableSplit) {
-    this.filepattern = filepattern;
-    this.formatClass = formatClass;
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
-    this.serializableSplit = serializableSplit;
-  }
-
-  public String getFilepattern() {
-    return filepattern;
-  }
-
-  public Class<? extends FileInputFormat<?, ?>> getFormatClass() {
-    return formatClass;
-  }
-
-  public Class<K> getKeyClass() {
-    return keyClass;
-  }
-
-  public Class<V> getValueClass() {
-    return valueClass;
-  }
-
-  @Override
-  public void validate() {
-    Preconditions.checkNotNull(filepattern,
-        "need to set the filepattern of a HadoopFileSource");
-    Preconditions.checkNotNull(formatClass,
-        "need to set the format class of a HadoopFileSource");
-    Preconditions.checkNotNull(keyClass,
-        "need to set the key class of a HadoopFileSource");
-    Preconditions.checkNotNull(valueClass,
-        "need to set the value class of a HadoopFileSource");
-  }
-
-  @Override
-  public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes,
-      PipelineOptions options) throws Exception {
-    if (serializableSplit == null) {
-      return Lists.transform(computeSplits(desiredBundleSizeBytes),
-          new Function<InputSplit, BoundedSource<KV<K, V>>>() {
-        @Nullable @Override
-        public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
-          return new HadoopFileSource<K, V>(filepattern, formatClass, keyClass,
-              valueClass, new SerializableSplit(inputSplit));
-        }
-      });
-    } else {
-      return ImmutableList.of(this);
-    }
-  }
-
-  private FileInputFormat<?, ?> createFormat(Job job) throws IOException, IllegalAccessException,
-      InstantiationException {
-    Path path = new Path(filepattern);
-    FileInputFormat.addInputPath(job, path);
-    return formatClass.newInstance();
-  }
-
-  private List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException,
-      IllegalAccessException, InstantiationException {
-    Job job = Job.getInstance();
-    FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
-    FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
-    return createFormat(job).getSplits(job);
-  }
-
-  @Override
-  public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException {
-    this.validate();
-
-    if (serializableSplit == null) {
-      return new HadoopFileReader<>(this, filepattern, formatClass);
-    } else {
-      return new HadoopFileReader<>(this, filepattern, formatClass,
-          serializableSplit.getSplit());
-    }
-  }
-
-  @Override
-  public Coder<KV<K, V>> getDefaultOutputCoder() {
-    return KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
-  }
-
-  @SuppressWarnings("unchecked")
-  private <T> Coder<T> getDefaultCoder(Class<T> c) {
-    if (Writable.class.isAssignableFrom(c)) {
-      Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
-      return (Coder<T>) WritableCoder.of(writableClass);
-    } else if (Void.class.equals(c)) {
-      return (Coder<T>) VoidCoder.of();
-    }
-    // TODO: how to use registered coders here?
-    throw new IllegalStateException("Cannot find coder for " + c);
-  }
-
-  // BoundedSource
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) {
-    long size = 0;
-    try {
-      Job job = Job.getInstance(); // new instance
-      for (FileStatus st : listStatus(createFormat(job), job)) {
-        size += st.getLen();
-      }
-    } catch (IOException | NoSuchMethodException | InvocationTargetException
-        | IllegalAccessException | InstantiationException e) {
-      // ignore, and return 0
-    }
-    return size;
-  }
-
-  private <K, V> List<FileStatus> listStatus(FileInputFormat<K, V> format,
-      JobContext jobContext) throws NoSuchMethodException, InvocationTargetException,
-      IllegalAccessException {
-    // FileInputFormat#listStatus is protected, so call using reflection
-    Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
-    listStatus.setAccessible(true);
-    @SuppressWarnings("unchecked")
-    List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, jobContext);
-    return stat;
-  }
-
-  @Override
-  public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-    return false;
-  }
-
-  static class HadoopFileReader<K, V> extends BoundedSource.BoundedReader<KV<K, V>> {
-
-    private final BoundedSource<KV<K, V>> source;
-    private final String filepattern;
-    private final Class formatClass;
-
-    private FileInputFormat<?, ?> format;
-    private TaskAttemptContext attemptContext;
-    private List<InputSplit> splits;
-    private ListIterator<InputSplit> splitsIterator;
-    private Configuration conf;
-    private RecordReader<K, V> currentReader;
-    private KV<K, V> currentPair;
-
-    /**
-     * Create a {@code HadoopFileReader} based on a file or a file pattern specification.
-     */
-    public HadoopFileReader(BoundedSource<KV<K, V>> source, String filepattern,
-        Class<? extends FileInputFormat<?, ?>> formatClass) {
-      this(source, filepattern, formatClass, null);
-    }
-
-    /**
-     * Create a {@code HadoopFileReader} based on a single Hadoop input split.
-     */
-    public HadoopFileReader(BoundedSource<KV<K, V>> source, String filepattern,
-        Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split) {
-      this.source = source;
-      this.filepattern = filepattern;
-      this.formatClass = formatClass;
-      if (split != null) {
-        this.splits = ImmutableList.of(split);
-        this.splitsIterator = splits.listIterator();
-      }
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      Job job = Job.getInstance(); // new instance
-      Path path = new Path(filepattern);
-      FileInputFormat.addInputPath(job, path);
-
-      try {
-        @SuppressWarnings("unchecked")
-        FileInputFormat<K, V> f = (FileInputFormat<K, V>) formatClass.newInstance();
-        this.format = f;
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException("Cannot instantiate file input format " + formatClass, e);
-      }
-      this.attemptContext = new TaskAttemptContextImpl(job.getConfiguration(),
-          new TaskAttemptID());
-
-      if (splitsIterator == null) {
-        this.splits = format.getSplits(job);
-        this.splitsIterator = splits.listIterator();
-      }
-      this.conf = job.getConfiguration();
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      try {
-        if (currentReader != null && currentReader.nextKeyValue()) {
-          currentPair = nextPair();
-          return true;
-        } else {
-          while (splitsIterator.hasNext()) {
-            // advance the reader and see if it has records
-            InputSplit nextSplit = splitsIterator.next();
-            @SuppressWarnings("unchecked")
-            RecordReader<K, V> reader =
-                (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
-            if (currentReader != null) {
-              currentReader.close();
-            }
-            currentReader = reader;
-            currentReader.initialize(nextSplit, attemptContext);
-            if (currentReader.nextKeyValue()) {
-              currentPair = nextPair();
-              return true;
-            }
-            currentReader.close();
-            currentReader = null;
-          }
-          // either no next split or all readers were empty
-          currentPair = null;
-          return false;
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException(e);
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    private KV<K, V> nextPair() throws IOException, InterruptedException {
-      K key = currentReader.getCurrentKey();
-      V value = currentReader.getCurrentValue();
-      // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
-      if (key instanceof Writable) {
-        key = (K) WritableUtils.clone((Writable) key, conf);
-      }
-      if (value instanceof Writable) {
-        value = (V) WritableUtils.clone((Writable) value, conf);
-      }
-      return KV.of(key, value);
-    }
-
-    @Override
-    public KV<K, V> getCurrent() throws NoSuchElementException {
-      if (currentPair == null) {
-        throw new NoSuchElementException();
-      }
-      return currentPair;
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (currentReader != null) {
-        currentReader.close();
-        currentReader = null;
-      }
-      currentPair = null;
-    }
-
-    @Override
-    public BoundedSource<KV<K, V>> getCurrentSource() {
-      return source;
-    }
-
-    // BoundedReader
-
-    @Override
-    public Double getFractionConsumed() {
-      if (currentReader == null) {
-        return 0.0;
-      }
-      if (splits.isEmpty()) {
-        return 1.0;
-      }
-      int index = splitsIterator.previousIndex();
-      int numReaders = splits.size();
-      if (index == numReaders) {
-        return 1.0;
-      }
-      double before = 1.0 * index / numReaders;
-      double after = 1.0 * (index + 1) / numReaders;
-      Double fractionOfCurrentReader = getProgress();
-      if (fractionOfCurrentReader == null) {
-        return before;
-      }
-      return before + fractionOfCurrentReader * (after - before);
-    }
-
-    private Double getProgress() {
-      try {
-        return (double) currentReader.getProgress();
-      } catch (IOException | InterruptedException e) {
-        return null;
-      }
-    }
-
-    @Override
-    public BoundedSource<KV<K, V>> splitAtFraction(double fraction) {
-      // Not yet supported. To implement this, the sizes of the splits should be used to
-      // calculate the remaining splits that constitute the given fraction, then a
-      // new source backed by those splits should be returned.
-      return null;
-    }
-  }
-
-  /**
-   * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
-   * serialized using Java's standard serialization mechanisms. Note that the InputSplit
-   * has to be Writable (which most are).
-   */
-  public static class SerializableSplit implements Externalizable {
-    private static final long serialVersionUID = 0L;
-
-    private InputSplit split;
-
-    public SerializableSplit() {
-    }
-
-    public SerializableSplit(InputSplit split) {
-      Preconditions.checkArgument(split instanceof Writable, "Split is not writable: "
-          + split);
-      this.split = split;
-    }
-
-    public InputSplit getSplit() {
-      return split;
-    }
-
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-      out.writeUTF(split.getClass().getCanonicalName());
-      ((Writable) split).write(out);
-    }
-
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-      String className = in.readUTF();
-      try {
-        split = (InputSplit) Class.forName(className).newInstance();
-        ((Writable) split).readFields(in);
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException(e);
-      }
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java b/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java
deleted file mode 100644
index 180875d..0000000
--- a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.hadoop;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.hadoop.io.Writable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-/**
- * A {@code WritableCoder} is a {@link org.apache.beam.sdk.coders.Coder} for a
- * Java class that implements {@link org.apache.hadoop.io.Writable}.
- *
- * <p> To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- *   PCollection<MyRecord> records =
- *       foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder
- */
-public class WritableCoder<T extends Writable> extends StandardCoder<T> {
-  private static final long serialVersionUID = 0L;
-
-  /**
-   * Returns a {@code WritableCoder} instance for the provided element class.
-   * @param <T> the element type
-   */
-  public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
-    return new WritableCoder<>(clazz);
-  }
-
-  @JsonCreator
-  @SuppressWarnings("unchecked")
-  public static WritableCoder<?> of(@JsonProperty("type") String classType)
-      throws ClassNotFoundException {
-    Class<?> clazz = Class.forName(classType);
-    if (!Writable.class.isAssignableFrom(clazz)) {
-      throw new ClassNotFoundException(
-          "Class " + classType + " does not implement Writable");
-    }
-    return of((Class<? extends Writable>) clazz);
-  }
-
-  private final Class<T> type;
-
-  public WritableCoder(Class<T> type) {
-    this.type = type;
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
-    value.write(new DataOutputStream(outStream));
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    try {
-      T t = type.newInstance();
-      t.readFields(new DataInputStream(inStream));
-      return t;
-    } catch (InstantiationException | IllegalAccessException e) {
-      throw new CoderException("unable to deserialize record", e);
-    }
-  }
-
-  @Override
-  public List<Coder<?>> getCoderArguments() {
-    return null;
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    result.put("type", type.getName());
-    return result;
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Hadoop Writable may be non-deterministic.");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java b/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java
deleted file mode 100644
index 72bd72a..0000000
--- a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.hadoop;
-
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.values.KV;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Tests for HadoopFileSource.
- */
-public class HadoopFileSourceTest {
-
-  Random random = new Random(0L);
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testFullyReadSingleFile() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
-    File file = createFileWithData("tmp.seq", expectedResults);
-
-    HadoopFileSource<IntWritable, Text> source =
-        HadoopFileSource.from(file.toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testFullyReadFilePattern() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    HadoopFileSource<IntWritable, Text> source =
-        HadoopFileSource.from(new File(file1.getParent(), "file*").toString(),
-            SequenceFileInputFormat.class, IntWritable.class, Text.class);
-    List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
-    expectedResults.addAll(data1);
-    expectedResults.addAll(data2);
-    expectedResults.addAll(data3);
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testCloseUnstartedFilePatternReader() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    HadoopFileSource<IntWritable, Text> source =
-        HadoopFileSource.from(new File(file1.getParent(), "file*").toString(),
-            SequenceFileInputFormat.class, IntWritable.class, Text.class);
-    Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
-    // Closing an unstarted FilePatternReader should not throw an exception.
-    try {
-      reader.close();
-    } catch (Exception e) {
-      fail("Closing an unstarted FilePatternReader should not throw an exception");
-    }
-  }
-
-  @Test
-  public void testSplits() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
-    File file = createFileWithData("tmp.avro", expectedResults);
-
-    HadoopFileSource<IntWritable, Text> source =
-        HadoopFileSource.from(file.toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    // Assert that the source produces the expected records
-    assertEquals(expectedResults, readFromSource(source, options));
-
-    // Split with a small bundle size (has to be at least size of sync interval)
-    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
-        .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
-    assertTrue(splits.size() > 2);
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
-    int nonEmptySplits = 0;
-    for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
-      if (readFromSource(subSource, options).size() > 0) {
-        nonEmptySplits += 1;
-      }
-    }
-    assertTrue(nonEmptySplits > 2);
-  }
-
-  private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
-      throws IOException {
-    File tmpFile = tmpFolder.newFile(filename);
-    try (Writer writer = SequenceFile.createWriter(new Configuration(),
-          Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
-          Writer.file(new Path(tmpFile.toURI())))) {
-
-      for (KV<IntWritable, Text> record : records) {
-        writer.append(record.getKey(), record.getValue());
-      }
-    }
-    return tmpFile;
-  }
-
-  private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
-      int numItems, int offset) {
-    List<KV<IntWritable, Text>> records = new ArrayList<>();
-    for (int i = 0; i < numItems; i++) {
-      IntWritable key = new IntWritable(i + offset);
-      Text value = new Text(createRandomString(dataItemLength));
-      records.add(KV.of(key, value));
-    }
-    return records;
-  }
-
-  private String createRandomString(int length) {
-    char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < length; i++) {
-      builder.append(chars[random.nextInt(chars.length)]);
-    }
-    return builder.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java b/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java
deleted file mode 100644
index 368b682..0000000
--- a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.hadoop;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-
-import org.apache.hadoop.io.IntWritable;
-import org.junit.Test;
-
-/**
- * Tests for WritableCoder.
- */
-public class WritableCoderTest {
-
-  @Test
-  public void testIntWritableEncoding() throws Exception {
-    IntWritable value = new IntWritable(42);
-    WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
-
-    CoderProperties.coderDecodeEncodeEqual(coder, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/README.md b/sdks/java/io/hdfs/README.md
new file mode 100644
index 0000000..7149cda
--- /dev/null
+++ b/sdks/java/io/hdfs/README.md
@@ -0,0 +1,24 @@
+# HDFS IO
+
+This library provides HDFS sources and sinks to make it possible to read and
+write Apache Hadoop file formats from Apache Beam pipelines.
+
+Currently, only the read path is implemented. A `HDFSFileSource` allows any
+Hadoop `FileInputFormat` to be read as a `PCollection`.
+
+A `HDFSFileSource` can be read from using the
+`org.apache.beam.sdk.io.Read` transform. For example:
+
+```java
+HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
+  MyKey.class, MyValue.class);
+PCollection<KV<MyKey, MyValue>> records = Read.from(mySource);
+```
+
+Alternatively, the `readFrom` method is a convenience method that returns a read
+transform. For example:
+
+```java
+PCollection<KV<MyKey, MyValue>> records = HDFSFileSource.readFrom(path,
+  MyInputFormat.class, MyKey.class, MyValue.class);
+```

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
new file mode 100644
index 0000000..3eaef62
--- /dev/null
+++ b/sdks/java/io/hdfs/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>io-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hdfs</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: HDFS</name>
+  <description>Library to read and write Hadoop/HDFS file formats from Beam.</description>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>2.7.0</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
new file mode 100644
index 0000000..ab537eb
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+
+/**
+ * A {@code BoundedSource} for reading files resident in a Hadoop filesystem (HDFS) using a
+ * Hadoop file-based input format.
+ *
+ * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
+ * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
+ * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to
+ * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
+ * key class and the value class.
+ *
+ * <p>A {@code HDFSFileSource} can be read from using the
+ * {@link org.apache.beam.sdk.io.Read} transform. For example:
+ *
+ * <pre>
+ * {@code
+ * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
+ *   MyKey.class, MyValue.class);
+ * PCollection<KV<MyKey, MyValue>> records = Read.from(mySource);
+ * }
+ * </pre>
+ *
+ * <p>The {@link HDFSFileSource#readFrom} method is a convenience method
+ * that returns a read transform. For example:
+ *
+ * <pre>
+ * {@code
+ * PCollection<KV<MyKey, MyValue>> records = HDFSFileSource.readFrom(path,
+ *   MyInputFormat.class, MyKey.class, MyValue.class);
+ * }
+ * </pre>
+ *
+ * Implementation note: Since Hadoop's {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
+ * determines the input splits, this class extends {@link BoundedSource} rather than
+ * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
+ * dictates input splits.
+
+ * @param <K> The type of keys to be read from the source.
+ * @param <V> The type of values to be read from the source.
+ */
+public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
+  private static final long serialVersionUID = 0L;
+
+  private final String filepattern;
+  private final Class<? extends FileInputFormat<?, ?>> formatClass;
+  private final Class<K> keyClass;
+  private final Class<V> valueClass;
+  private final SerializableSplit serializableSplit;
+
+  /**
+   * Creates a {@code Read} transform that will read from an {@code HDFSFileSource}
+   * with the given file name or pattern ("glob") using the given Hadoop
+   * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
+   * with key-value types specified by the given key class and value class.
+   */
+  public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K, V>> readFrom(
+      String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) {
+    return Read.from(from(filepattern, formatClass, keyClass, valueClass));
+  }
+
+  /**
+   * Creates a {@code HDFSFileSource} that reads from the given file name or pattern ("glob")
+   * using the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
+   * with key-value types specified by the given key class and value class.
+   */
+  public static <K, V, T extends FileInputFormat<K, V>> HDFSFileSource<K, V> from(
+      String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) {
+    @SuppressWarnings("unchecked")
+    HDFSFileSource<K, V> source = (HDFSFileSource<K, V>)
+        new HDFSFileSource(filepattern, formatClass, keyClass, valueClass);
+    return source;
+  }
+
+  /**
+   * Create a {@code HDFSFileSource} based on a file or a file pattern specification.
+   */
+  private HDFSFileSource(String filepattern,
+                         Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
+                         Class<V> valueClass) {
+    this(filepattern, formatClass, keyClass, valueClass, null);
+  }
+
+  /**
+   * Create a {@code HDFSFileSource} based on a single Hadoop input split, which won't be
+   * split up further.
+   */
+  private HDFSFileSource(String filepattern,
+                         Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
+                         Class<V> valueClass, SerializableSplit serializableSplit) {
+    this.filepattern = filepattern;
+    this.formatClass = formatClass;
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+    this.serializableSplit = serializableSplit;
+  }
+
+  public String getFilepattern() {
+    return filepattern;
+  }
+
+  public Class<? extends FileInputFormat<?, ?>> getFormatClass() {
+    return formatClass;
+  }
+
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+
+  public Class<V> getValueClass() {
+    return valueClass;
+  }
+
+  @Override
+  public void validate() {
+    Preconditions.checkNotNull(filepattern,
+        "need to set the filepattern of a HDFSFileSource");
+    Preconditions.checkNotNull(formatClass,
+        "need to set the format class of a HDFSFileSource");
+    Preconditions.checkNotNull(keyClass,
+        "need to set the key class of a HDFSFileSource");
+    Preconditions.checkNotNull(valueClass,
+        "need to set the value class of a HDFSFileSource");
+  }
+
+  @Override
+  public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes,
+      PipelineOptions options) throws Exception {
+    if (serializableSplit == null) {
+      return Lists.transform(computeSplits(desiredBundleSizeBytes),
+          new Function<InputSplit, BoundedSource<KV<K, V>>>() {
+        @Nullable @Override
+        public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
+          return new HDFSFileSource<K, V>(filepattern, formatClass, keyClass,
+              valueClass, new SerializableSplit(inputSplit));
+        }
+      });
+    } else {
+      return ImmutableList.of(this);
+    }
+  }
+
+  private FileInputFormat<?, ?> createFormat(Job job) throws IOException, IllegalAccessException,
+      InstantiationException {
+    Path path = new Path(filepattern);
+    FileInputFormat.addInputPath(job, path);
+    return formatClass.newInstance();
+  }
+
+  private List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException,
+      IllegalAccessException, InstantiationException {
+    Job job = Job.getInstance();
+    FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
+    FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
+    return createFormat(job).getSplits(job);
+  }
+
+  @Override
+  public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException {
+    this.validate();
+
+    if (serializableSplit == null) {
+      return new HDFSFileReader<>(this, filepattern, formatClass);
+    } else {
+      return new HDFSFileReader<>(this, filepattern, formatClass,
+          serializableSplit.getSplit());
+    }
+  }
+
+  @Override
+  public Coder<KV<K, V>> getDefaultOutputCoder() {
+    return KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> Coder<T> getDefaultCoder(Class<T> c) {
+    if (Writable.class.isAssignableFrom(c)) {
+      Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
+      return (Coder<T>) WritableCoder.of(writableClass);
+    } else if (Void.class.equals(c)) {
+      return (Coder<T>) VoidCoder.of();
+    }
+    // TODO: how to use registered coders here?
+    throw new IllegalStateException("Cannot find coder for " + c);
+  }
+
+  // BoundedSource
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    long size = 0;
+    try {
+      Job job = Job.getInstance(); // new instance
+      for (FileStatus st : listStatus(createFormat(job), job)) {
+        size += st.getLen();
+      }
+    } catch (IOException | NoSuchMethodException | InvocationTargetException
+        | IllegalAccessException | InstantiationException e) {
+      // ignore, and return 0
+    }
+    return size;
+  }
+
+  private <K, V> List<FileStatus> listStatus(FileInputFormat<K, V> format,
+      JobContext jobContext) throws NoSuchMethodException, InvocationTargetException,
+      IllegalAccessException {
+    // FileInputFormat#listStatus is protected, so call using reflection
+    Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
+    listStatus.setAccessible(true);
+    @SuppressWarnings("unchecked")
+    List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, jobContext);
+    return stat;
+  }
+
+  @Override
+  public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+    return false;
+  }
+
+  static class HDFSFileReader<K, V> extends BoundedSource.BoundedReader<KV<K, V>> {
+
+    private final BoundedSource<KV<K, V>> source;
+    private final String filepattern;
+    private final Class formatClass;
+
+    private FileInputFormat<?, ?> format;
+    private TaskAttemptContext attemptContext;
+    private List<InputSplit> splits;
+    private ListIterator<InputSplit> splitsIterator;
+    private Configuration conf;
+    private RecordReader<K, V> currentReader;
+    private KV<K, V> currentPair;
+
+    /**
+     * Create a {@code HDFSFileReader} based on a file or a file pattern specification.
+     */
+    public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
+                          Class<? extends FileInputFormat<?, ?>> formatClass) {
+      this(source, filepattern, formatClass, null);
+    }
+
+    /**
+     * Create a {@code HDFSFileReader} based on a single Hadoop input split.
+     */
+    public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
+                          Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split) {
+      this.source = source;
+      this.filepattern = filepattern;
+      this.formatClass = formatClass;
+      if (split != null) {
+        this.splits = ImmutableList.of(split);
+        this.splitsIterator = splits.listIterator();
+      }
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      Job job = Job.getInstance(); // new instance
+      Path path = new Path(filepattern);
+      FileInputFormat.addInputPath(job, path);
+
+      try {
+        @SuppressWarnings("unchecked")
+        FileInputFormat<K, V> f = (FileInputFormat<K, V>) formatClass.newInstance();
+        this.format = f;
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new IOException("Cannot instantiate file input format " + formatClass, e);
+      }
+      this.attemptContext = new TaskAttemptContextImpl(job.getConfiguration(),
+          new TaskAttemptID());
+
+      if (splitsIterator == null) {
+        this.splits = format.getSplits(job);
+        this.splitsIterator = splits.listIterator();
+      }
+      this.conf = job.getConfiguration();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      try {
+        if (currentReader != null && currentReader.nextKeyValue()) {
+          currentPair = nextPair();
+          return true;
+        } else {
+          while (splitsIterator.hasNext()) {
+            // advance the reader and see if it has records
+            InputSplit nextSplit = splitsIterator.next();
+            @SuppressWarnings("unchecked")
+            RecordReader<K, V> reader =
+                (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
+            if (currentReader != null) {
+              currentReader.close();
+            }
+            currentReader = reader;
+            currentReader.initialize(nextSplit, attemptContext);
+            if (currentReader.nextKeyValue()) {
+              currentPair = nextPair();
+              return true;
+            }
+            currentReader.close();
+            currentReader = null;
+          }
+          // either no next split or all readers were empty
+          currentPair = null;
+          return false;
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private KV<K, V> nextPair() throws IOException, InterruptedException {
+      K key = currentReader.getCurrentKey();
+      V value = currentReader.getCurrentValue();
+      // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
+      if (key instanceof Writable) {
+        key = (K) WritableUtils.clone((Writable) key, conf);
+      }
+      if (value instanceof Writable) {
+        value = (V) WritableUtils.clone((Writable) value, conf);
+      }
+      return KV.of(key, value);
+    }
+
+    @Override
+    public KV<K, V> getCurrent() throws NoSuchElementException {
+      if (currentPair == null) {
+        throw new NoSuchElementException();
+      }
+      return currentPair;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (currentReader != null) {
+        currentReader.close();
+        currentReader = null;
+      }
+      currentPair = null;
+    }
+
+    @Override
+    public BoundedSource<KV<K, V>> getCurrentSource() {
+      return source;
+    }
+
+    // BoundedReader
+
+    @Override
+    public Double getFractionConsumed() {
+      if (currentReader == null) {
+        return 0.0;
+      }
+      if (splits.isEmpty()) {
+        return 1.0;
+      }
+      int index = splitsIterator.previousIndex();
+      int numReaders = splits.size();
+      if (index == numReaders) {
+        return 1.0;
+      }
+      double before = 1.0 * index / numReaders;
+      double after = 1.0 * (index + 1) / numReaders;
+      Double fractionOfCurrentReader = getProgress();
+      if (fractionOfCurrentReader == null) {
+        return before;
+      }
+      return before + fractionOfCurrentReader * (after - before);
+    }
+
+    private Double getProgress() {
+      try {
+        return (double) currentReader.getProgress();
+      } catch (IOException | InterruptedException e) {
+        return null;
+      }
+    }
+
+    @Override
+    public BoundedSource<KV<K, V>> splitAtFraction(double fraction) {
+      // Not yet supported. To implement this, the sizes of the splits should be used to
+      // calculate the remaining splits that constitute the given fraction, then a
+      // new source backed by those splits should be returned.
+      return null;
+    }
+  }
+
+  /**
+   * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
+   * serialized using Java's standard serialization mechanisms. Note that the InputSplit
+   * has to be Writable (which most are).
+   */
+  public static class SerializableSplit implements Externalizable {
+    private static final long serialVersionUID = 0L;
+
+    private InputSplit split;
+
+    public SerializableSplit() {
+    }
+
+    public SerializableSplit(InputSplit split) {
+      Preconditions.checkArgument(split instanceof Writable, "Split is not writable: "
+          + split);
+      this.split = split;
+    }
+
+    public InputSplit getSplit() {
+      return split;
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+      out.writeUTF(split.getClass().getCanonicalName());
+      ((Writable) split).write(out);
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+      String className = in.readUTF();
+      try {
+        split = (InputSplit) Class.forName(className).newInstance();
+        ((Writable) split).readFields(in);
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
new file mode 100644
index 0000000..814a762
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.io.Writable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * A {@code WritableCoder} is a {@link org.apache.beam.sdk.coders.Coder} for a
+ * Java class that implements {@link org.apache.hadoop.io.Writable}.
+ *
+ * <p> To use, specify the coder type on a PCollection:
+ * <pre>
+ * {@code
+ *   PCollection<MyRecord> records =
+ *       foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
+ * }
+ * </pre>
+ *
+ * @param <T> the type of elements handled by this coder
+ */
+public class WritableCoder<T extends Writable> extends StandardCoder<T> {
+  private static final long serialVersionUID = 0L;
+
+  /**
+   * Returns a {@code WritableCoder} instance for the provided element class.
+   * @param <T> the element type
+   */
+  public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
+    return new WritableCoder<>(clazz);
+  }
+
+  @JsonCreator
+  @SuppressWarnings("unchecked")
+  public static WritableCoder<?> of(@JsonProperty("type") String classType)
+      throws ClassNotFoundException {
+    Class<?> clazz = Class.forName(classType);
+    if (!Writable.class.isAssignableFrom(clazz)) {
+      throw new ClassNotFoundException(
+          "Class " + classType + " does not implement Writable");
+    }
+    return of((Class<? extends Writable>) clazz);
+  }
+
+  private final Class<T> type;
+
+  public WritableCoder(Class<T> type) {
+    this.type = type;
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context) throws IOException {
+    value.write(new DataOutputStream(outStream));
+  }
+
+  @Override
+  public T decode(InputStream inStream, Context context) throws IOException {
+    try {
+      T t = type.newInstance();
+      t.readFields(new DataInputStream(inStream));
+      return t;
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new CoderException("unable to deserialize record", e);
+    }
+  }
+
+  @Override
+  public List<Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public CloudObject asCloudObject() {
+    CloudObject result = super.asCloudObject();
+    result.put("type", type.getName());
+    return result;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    throw new NonDeterministicException(this,
+        "Hadoop Writable may be non-deterministic.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
new file mode 100644
index 0000000..67df7bc
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.values.KV;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Tests for HDFSFileSource.
+ */
+public class HDFSFileSourceTest {
+
+  Random random = new Random(0L);
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testFullyReadSingleFile() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
+    File file = createFileWithData("tmp.seq", expectedResults);
+
+    HDFSFileSource<IntWritable, Text> source =
+        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+            IntWritable.class, Text.class);
+
+    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+
+    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+  }
+
+  @Test
+  public void testFullyReadFilePattern() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+    File file1 = createFileWithData("file1", data1);
+
+    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+    createFileWithData("file2", data2);
+
+    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+    createFileWithData("file3", data3);
+
+    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+    createFileWithData("otherfile", data4);
+
+    HDFSFileSource<IntWritable, Text> source =
+        HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+            SequenceFileInputFormat.class, IntWritable.class, Text.class);
+    List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
+    expectedResults.addAll(data1);
+    expectedResults.addAll(data2);
+    expectedResults.addAll(data3);
+    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+  }
+
+  @Test
+  public void testCloseUnstartedFilePatternReader() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+    File file1 = createFileWithData("file1", data1);
+
+    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+    createFileWithData("file2", data2);
+
+    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+    createFileWithData("file3", data3);
+
+    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+    createFileWithData("otherfile", data4);
+
+    HDFSFileSource<IntWritable, Text> source =
+        HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+            SequenceFileInputFormat.class, IntWritable.class, Text.class);
+    Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
+    // Closing an unstarted FilePatternReader should not throw an exception.
+    try {
+      reader.close();
+    } catch (Exception e) {
+      fail("Closing an unstarted FilePatternReader should not throw an exception");
+    }
+  }
+
+  @Test
+  public void testSplits() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
+    File file = createFileWithData("tmp.avro", expectedResults);
+
+    HDFSFileSource<IntWritable, Text> source =
+        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+            IntWritable.class, Text.class);
+
+    // Assert that the source produces the expected records
+    assertEquals(expectedResults, readFromSource(source, options));
+
+    // Split with a small bundle size (has to be at least size of sync interval)
+    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
+        .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
+    assertTrue(splits.size() > 2);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+    int nonEmptySplits = 0;
+    for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
+      if (readFromSource(subSource, options).size() > 0) {
+        nonEmptySplits += 1;
+      }
+    }
+    assertTrue(nonEmptySplits > 2);
+  }
+
+  private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
+      throws IOException {
+    File tmpFile = tmpFolder.newFile(filename);
+    try (Writer writer = SequenceFile.createWriter(new Configuration(),
+          Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+          Writer.file(new Path(tmpFile.toURI())))) {
+
+      for (KV<IntWritable, Text> record : records) {
+        writer.append(record.getKey(), record.getValue());
+      }
+    }
+    return tmpFile;
+  }
+
+  private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
+      int numItems, int offset) {
+    List<KV<IntWritable, Text>> records = new ArrayList<>();
+    for (int i = 0; i < numItems; i++) {
+      IntWritable key = new IntWritable(i + offset);
+      Text value = new Text(createRandomString(dataItemLength));
+      records.add(KV.of(key, value));
+    }
+    return records;
+  }
+
+  private String createRandomString(int length) {
+    char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < length; i++) {
+      builder.append(chars[random.nextInt(chars.length)]);
+    }
+    return builder.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
new file mode 100644
index 0000000..715da8e
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Test;
+
+/**
+ * Tests for WritableCoder.
+ */
+public class WritableCoderTest {
+
+  @Test
+  public void testIntWritableEncoding() throws Exception {
+    IntWritable value = new IntWritable(42);
+    WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
+
+    CoderProperties.coderDecodeEncodeEqual(coder, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
new file mode 100644
index 0000000..75f192c
--- /dev/null
+++ b/sdks/java/io/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>java-sdk-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>io-parent</artifactId>
+  <packaging>pom</packaging>
+  <name>Apache Beam :: SDKs :: Java :: IO</name>
+  <description>Beam SDK Java IO provides different connectivity components
+  (sources and sinks) to consume and produce data from systems.</description>
+
+  <modules>
+    <module>hdfs</module>
+  </modules>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 1111d92..6bd7ee7 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -36,6 +36,7 @@
 
   <modules>
     <module>core</module>
+    <module>io</module>
     <!-- sdks/java/maven-archtypes has several dependencies on the
          DataflowPipelineRunner. Until these are refactored out or
          a released artifact exists, we need to modify the build order.


[2/2] incubator-beam git commit: This closes #96

Posted by da...@apache.org.
This closes #96


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8ed3980
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8ed3980
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8ed3980

Branch: refs/heads/master
Commit: c8ed3980687f22c0d2342ed7f7117b9e6550e1c6
Parents: bcefff6 404b633
Author: Davor Bonaci <da...@google.com>
Authored: Fri Apr 15 10:28:53 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Apr 15 10:28:53 2016 -0700

----------------------------------------------------------------------
 contrib/hadoop/README.md                        |  24 -
 contrib/hadoop/pom.xml                          | 170 -------
 .../apache/contrib/hadoop/HadoopFileSource.java | 486 -------------------
 .../apache/contrib/hadoop/WritableCoder.java    | 111 -----
 .../contrib/hadoop/HadoopFileSourceTest.java    | 190 --------
 .../contrib/hadoop/WritableCoderTest.java       |  37 --
 sdks/java/io/hdfs/README.md                     |  24 +
 sdks/java/io/hdfs/pom.xml                       |  65 +++
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 486 +++++++++++++++++++
 .../apache/beam/sdk/io/hdfs/WritableCoder.java  | 111 +++++
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    | 190 ++++++++
 .../beam/sdk/io/hdfs/WritableCoderTest.java     |  37 ++
 sdks/java/io/pom.xml                            |  41 ++
 sdks/java/pom.xml                               |   1 +
 14 files changed, 955 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------