You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/15 19:29:19 UTC
[1/2] incubator-beam git commit: [BEAM-77] Move hadoop contrib into
hdfs IO
Repository: incubator-beam
Updated Branches:
refs/heads/master bcefff6a3 -> c8ed39806
[BEAM-77] Move hadoop contrib into hdfs IO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/404b633d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/404b633d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/404b633d
Branch: refs/heads/master
Commit: 404b633d43d23940a7f11a93a472980f7bb09ce7
Parents: bcefff6
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Fri Apr 15 08:48:10 2016 +0200
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Apr 15 10:28:48 2016 -0700
----------------------------------------------------------------------
contrib/hadoop/README.md | 24 -
contrib/hadoop/pom.xml | 170 -------
.../apache/contrib/hadoop/HadoopFileSource.java | 486 -------------------
.../apache/contrib/hadoop/WritableCoder.java | 111 -----
.../contrib/hadoop/HadoopFileSourceTest.java | 190 --------
.../contrib/hadoop/WritableCoderTest.java | 37 --
sdks/java/io/hdfs/README.md | 24 +
sdks/java/io/hdfs/pom.xml | 65 +++
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 486 +++++++++++++++++++
.../apache/beam/sdk/io/hdfs/WritableCoder.java | 111 +++++
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 190 ++++++++
.../beam/sdk/io/hdfs/WritableCoderTest.java | 37 ++
sdks/java/io/pom.xml | 41 ++
sdks/java/pom.xml | 1 +
14 files changed, 955 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/README.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop/README.md b/contrib/hadoop/README.md
deleted file mode 100644
index 49bbf98..0000000
--- a/contrib/hadoop/README.md
+++ /dev/null
@@ -1,24 +0,0 @@
-# Hadoop module
-
-This library provides Dataflow sources and sinks to make it possible to read and
-write Apache Hadoop file formats from Dataflow pipelines.
-
-Currently, only the read path is implemented. A `HadoopFileSource` allows any
-Hadoop `FileInputFormat` to be read as a `PCollection`.
-
-A `HadoopFileSource` can be read from using the
-`com.google.cloud.dataflow.sdk.io.Read` transform. For example:
-
-```java
-HadoopFileSource<K, V> source = HadoopFileSource.from(path, MyInputFormat.class,
- MyKey.class, MyValue.class);
-PCollection<KV<MyKey, MyValue>> records = Read.from(mySource);
-```
-
-Alternatively, the `readFrom` method is a convenience method that returns a read
-transform. For example:
-
-```java
-PCollection<KV<MyKey, MyValue>> records = HadoopFileSource.readFrom(path,
- MyInputFormat.class, MyKey.class, MyValue.class);
-```
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/hadoop/pom.xml b/contrib/hadoop/pom.xml
deleted file mode 100644
index 24e454b..0000000
--- a/contrib/hadoop/pom.xml
+++ /dev/null
@@ -1,170 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.google.cloud.dataflow</groupId>
- <artifactId>google-cloud-dataflow-java-contrib-hadoop</artifactId>
- <name>Google Cloud Dataflow Hadoop Library</name>
- <description>Library to read and write Hadoop file formats from Dataflow.</description>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <beam-version>[0.1.0, 1.0.0)</beam-version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.2</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.12</version>
- <dependencies>
- <dependency>
- <groupId>com.puppycrawl.tools</groupId>
- <artifactId>checkstyle</artifactId>
- <version>6.6</version>
- </dependency>
- </dependencies>
- <configuration>
- <configLocation>../../sdks/java/checkstyle.xml</configLocation>
- <consoleOutput>true</consoleOutput>
- <failOnViolation>true</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!-- Source plugin for generating source and test-source JARs. -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.4</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <phase>compile</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- <execution>
- <id>attach-test-sources</id>
- <phase>test-compile</phase>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
- <windowtitle>Google Cloud Dataflow Hadoop Contrib</windowtitle>
- <doctitle>Google Cloud Dataflow Hadoop Contrib</doctitle>
-
- <subpackages>com.google.cloud.dataflow.contrib.hadoop</subpackages>
- <use>false</use>
- <bottom><![CDATA[<br>]]></bottom>
-
- <offlineLinks>
- <offlineLink>
- <url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
- <location>${basedir}/../../javadoc/dataflow-sdk-docs</location>
- </offlineLink>
- <offlineLink>
- <url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
- <location>${basedir}/../../javadoc/guava-docs</location>
- </offlineLink>
- </offlineLinks>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>jar</goal>
- </goals>
- <phase>package</phase>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>java-sdk-all</artifactId>
- <version>${beam-version}</version>
- </dependency>
-
- <!-- @tomwhite: Hadoop doesn't have great RPC client compatibility between one version and
- another so it's common to mark the Hadoop dependency as provided and have users specify the
- version they need in their project. -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.7.0</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- test dependencies -->
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- <version>1.3</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java b/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java
deleted file mode 100644
index 65862f7..0000000
--- a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.hadoop;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.KV;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-
-/**
- * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a
- * Hadoop file-based input format.
- *
- * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
- * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
- * Hadoop files, use {@link HadoopFileSource#from} to specify the path(s) of the files to
- * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
- * key class and the value class.
- *
- * <p>A {@code HadoopFileSource} can be read from using the
- * {@link org.apache.beam.sdk.io.Read} transform. For example:
- *
- * <pre>
- * {@code
- * HadoopFileSource<K, V> source = HadoopFileSource.from(path, MyInputFormat.class,
- * MyKey.class, MyValue.class);
- * PCollection<KV<MyKey, MyValue>> records = Read.from(mySource);
- * }
- * </pre>
- *
- * <p>The {@link HadoopFileSource#readFrom} method is a convenience method
- * that returns a read transform. For example:
- *
- * <pre>
- * {@code
- * PCollection<KV<MyKey, MyValue>> records = HadoopFileSource.readFrom(path,
- * MyInputFormat.class, MyKey.class, MyValue.class);
- * }
- * </pre>
- *
- * Implementation note: Since Hadoop's {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
- * determines the input splits, this class extends {@link BoundedSource} rather than
- * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
- * dictates input splits.
-
- * @param <K> The type of keys to be read from the source.
- * @param <V> The type of values to be read from the source.
- */
-public class HadoopFileSource<K, V> extends BoundedSource<KV<K, V>> {
- private static final long serialVersionUID = 0L;
-
- private final String filepattern;
- private final Class<? extends FileInputFormat<?, ?>> formatClass;
- private final Class<K> keyClass;
- private final Class<V> valueClass;
- private final SerializableSplit serializableSplit;
-
- /**
- * Creates a {@code Read} transform that will read from an {@code HadoopFileSource}
- * with the given file name or pattern ("glob") using the given Hadoop
- * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
- * with key-value types specified by the given key class and value class.
- */
- public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K, V>> readFrom(
- String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) {
- return Read.from(from(filepattern, formatClass, keyClass, valueClass));
- }
-
- /**
- * Creates a {@code HadoopFileSource} that reads from the given file name or pattern ("glob")
- * using the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
- * with key-value types specified by the given key class and value class.
- */
- public static <K, V, T extends FileInputFormat<K, V>> HadoopFileSource<K, V> from(
- String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) {
- @SuppressWarnings("unchecked")
- HadoopFileSource<K, V> source = (HadoopFileSource<K, V>)
- new HadoopFileSource(filepattern, formatClass, keyClass, valueClass);
- return source;
- }
-
- /**
- * Create a {@code HadoopFileSource} based on a file or a file pattern specification.
- */
- private HadoopFileSource(String filepattern,
- Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
- Class<V> valueClass) {
- this(filepattern, formatClass, keyClass, valueClass, null);
- }
-
- /**
- * Create a {@code HadoopFileSource} based on a single Hadoop input split, which won't be
- * split up further.
- */
- private HadoopFileSource(String filepattern,
- Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
- Class<V> valueClass, SerializableSplit serializableSplit) {
- this.filepattern = filepattern;
- this.formatClass = formatClass;
- this.keyClass = keyClass;
- this.valueClass = valueClass;
- this.serializableSplit = serializableSplit;
- }
-
- public String getFilepattern() {
- return filepattern;
- }
-
- public Class<? extends FileInputFormat<?, ?>> getFormatClass() {
- return formatClass;
- }
-
- public Class<K> getKeyClass() {
- return keyClass;
- }
-
- public Class<V> getValueClass() {
- return valueClass;
- }
-
- @Override
- public void validate() {
- Preconditions.checkNotNull(filepattern,
- "need to set the filepattern of a HadoopFileSource");
- Preconditions.checkNotNull(formatClass,
- "need to set the format class of a HadoopFileSource");
- Preconditions.checkNotNull(keyClass,
- "need to set the key class of a HadoopFileSource");
- Preconditions.checkNotNull(valueClass,
- "need to set the value class of a HadoopFileSource");
- }
-
- @Override
- public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes,
- PipelineOptions options) throws Exception {
- if (serializableSplit == null) {
- return Lists.transform(computeSplits(desiredBundleSizeBytes),
- new Function<InputSplit, BoundedSource<KV<K, V>>>() {
- @Nullable @Override
- public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
- return new HadoopFileSource<K, V>(filepattern, formatClass, keyClass,
- valueClass, new SerializableSplit(inputSplit));
- }
- });
- } else {
- return ImmutableList.of(this);
- }
- }
-
- private FileInputFormat<?, ?> createFormat(Job job) throws IOException, IllegalAccessException,
- InstantiationException {
- Path path = new Path(filepattern);
- FileInputFormat.addInputPath(job, path);
- return formatClass.newInstance();
- }
-
- private List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException,
- IllegalAccessException, InstantiationException {
- Job job = Job.getInstance();
- FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
- FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
- return createFormat(job).getSplits(job);
- }
-
- @Override
- public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException {
- this.validate();
-
- if (serializableSplit == null) {
- return new HadoopFileReader<>(this, filepattern, formatClass);
- } else {
- return new HadoopFileReader<>(this, filepattern, formatClass,
- serializableSplit.getSplit());
- }
- }
-
- @Override
- public Coder<KV<K, V>> getDefaultOutputCoder() {
- return KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
- }
-
- @SuppressWarnings("unchecked")
- private <T> Coder<T> getDefaultCoder(Class<T> c) {
- if (Writable.class.isAssignableFrom(c)) {
- Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
- return (Coder<T>) WritableCoder.of(writableClass);
- } else if (Void.class.equals(c)) {
- return (Coder<T>) VoidCoder.of();
- }
- // TODO: how to use registered coders here?
- throw new IllegalStateException("Cannot find coder for " + c);
- }
-
- // BoundedSource
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) {
- long size = 0;
- try {
- Job job = Job.getInstance(); // new instance
- for (FileStatus st : listStatus(createFormat(job), job)) {
- size += st.getLen();
- }
- } catch (IOException | NoSuchMethodException | InvocationTargetException
- | IllegalAccessException | InstantiationException e) {
- // ignore, and return 0
- }
- return size;
- }
-
- private <K, V> List<FileStatus> listStatus(FileInputFormat<K, V> format,
- JobContext jobContext) throws NoSuchMethodException, InvocationTargetException,
- IllegalAccessException {
- // FileInputFormat#listStatus is protected, so call using reflection
- Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
- listStatus.setAccessible(true);
- @SuppressWarnings("unchecked")
- List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, jobContext);
- return stat;
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return false;
- }
-
- static class HadoopFileReader<K, V> extends BoundedSource.BoundedReader<KV<K, V>> {
-
- private final BoundedSource<KV<K, V>> source;
- private final String filepattern;
- private final Class formatClass;
-
- private FileInputFormat<?, ?> format;
- private TaskAttemptContext attemptContext;
- private List<InputSplit> splits;
- private ListIterator<InputSplit> splitsIterator;
- private Configuration conf;
- private RecordReader<K, V> currentReader;
- private KV<K, V> currentPair;
-
- /**
- * Create a {@code HadoopFileReader} based on a file or a file pattern specification.
- */
- public HadoopFileReader(BoundedSource<KV<K, V>> source, String filepattern,
- Class<? extends FileInputFormat<?, ?>> formatClass) {
- this(source, filepattern, formatClass, null);
- }
-
- /**
- * Create a {@code HadoopFileReader} based on a single Hadoop input split.
- */
- public HadoopFileReader(BoundedSource<KV<K, V>> source, String filepattern,
- Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split) {
- this.source = source;
- this.filepattern = filepattern;
- this.formatClass = formatClass;
- if (split != null) {
- this.splits = ImmutableList.of(split);
- this.splitsIterator = splits.listIterator();
- }
- }
-
- @Override
- public boolean start() throws IOException {
- Job job = Job.getInstance(); // new instance
- Path path = new Path(filepattern);
- FileInputFormat.addInputPath(job, path);
-
- try {
- @SuppressWarnings("unchecked")
- FileInputFormat<K, V> f = (FileInputFormat<K, V>) formatClass.newInstance();
- this.format = f;
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IOException("Cannot instantiate file input format " + formatClass, e);
- }
- this.attemptContext = new TaskAttemptContextImpl(job.getConfiguration(),
- new TaskAttemptID());
-
- if (splitsIterator == null) {
- this.splits = format.getSplits(job);
- this.splitsIterator = splits.listIterator();
- }
- this.conf = job.getConfiguration();
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- try {
- if (currentReader != null && currentReader.nextKeyValue()) {
- currentPair = nextPair();
- return true;
- } else {
- while (splitsIterator.hasNext()) {
- // advance the reader and see if it has records
- InputSplit nextSplit = splitsIterator.next();
- @SuppressWarnings("unchecked")
- RecordReader<K, V> reader =
- (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
- if (currentReader != null) {
- currentReader.close();
- }
- currentReader = reader;
- currentReader.initialize(nextSplit, attemptContext);
- if (currentReader.nextKeyValue()) {
- currentPair = nextPair();
- return true;
- }
- currentReader.close();
- currentReader = null;
- }
- // either no next split or all readers were empty
- currentPair = null;
- return false;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
- }
-
- @SuppressWarnings("unchecked")
- private KV<K, V> nextPair() throws IOException, InterruptedException {
- K key = currentReader.getCurrentKey();
- V value = currentReader.getCurrentValue();
- // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
- if (key instanceof Writable) {
- key = (K) WritableUtils.clone((Writable) key, conf);
- }
- if (value instanceof Writable) {
- value = (V) WritableUtils.clone((Writable) value, conf);
- }
- return KV.of(key, value);
- }
-
- @Override
- public KV<K, V> getCurrent() throws NoSuchElementException {
- if (currentPair == null) {
- throw new NoSuchElementException();
- }
- return currentPair;
- }
-
- @Override
- public void close() throws IOException {
- if (currentReader != null) {
- currentReader.close();
- currentReader = null;
- }
- currentPair = null;
- }
-
- @Override
- public BoundedSource<KV<K, V>> getCurrentSource() {
- return source;
- }
-
- // BoundedReader
-
- @Override
- public Double getFractionConsumed() {
- if (currentReader == null) {
- return 0.0;
- }
- if (splits.isEmpty()) {
- return 1.0;
- }
- int index = splitsIterator.previousIndex();
- int numReaders = splits.size();
- if (index == numReaders) {
- return 1.0;
- }
- double before = 1.0 * index / numReaders;
- double after = 1.0 * (index + 1) / numReaders;
- Double fractionOfCurrentReader = getProgress();
- if (fractionOfCurrentReader == null) {
- return before;
- }
- return before + fractionOfCurrentReader * (after - before);
- }
-
- private Double getProgress() {
- try {
- return (double) currentReader.getProgress();
- } catch (IOException | InterruptedException e) {
- return null;
- }
- }
-
- @Override
- public BoundedSource<KV<K, V>> splitAtFraction(double fraction) {
- // Not yet supported. To implement this, the sizes of the splits should be used to
- // calculate the remaining splits that constitute the given fraction, then a
- // new source backed by those splits should be returned.
- return null;
- }
- }
-
- /**
- * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
- * serialized using Java's standard serialization mechanisms. Note that the InputSplit
- * has to be Writable (which most are).
- */
- public static class SerializableSplit implements Externalizable {
- private static final long serialVersionUID = 0L;
-
- private InputSplit split;
-
- public SerializableSplit() {
- }
-
- public SerializableSplit(InputSplit split) {
- Preconditions.checkArgument(split instanceof Writable, "Split is not writable: "
- + split);
- this.split = split;
- }
-
- public InputSplit getSplit() {
- return split;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(split.getClass().getCanonicalName());
- ((Writable) split).write(out);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- String className = in.readUTF();
- try {
- split = (InputSplit) Class.forName(className).newInstance();
- ((Writable) split).readFields(in);
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IOException(e);
- }
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java b/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java
deleted file mode 100644
index 180875d..0000000
--- a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.hadoop;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.hadoop.io.Writable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-/**
- * A {@code WritableCoder} is a {@link org.apache.beam.sdk.coders.Coder} for a
- * Java class that implements {@link org.apache.hadoop.io.Writable}.
- *
- * <p> To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- * PCollection<MyRecord> records =
- * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder
- */
-public class WritableCoder<T extends Writable> extends StandardCoder<T> {
- private static final long serialVersionUID = 0L;
-
- /**
- * Returns a {@code WritableCoder} instance for the provided element class.
- * @param <T> the element type
- */
- public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
- return new WritableCoder<>(clazz);
- }
-
- @JsonCreator
- @SuppressWarnings("unchecked")
- public static WritableCoder<?> of(@JsonProperty("type") String classType)
- throws ClassNotFoundException {
- Class<?> clazz = Class.forName(classType);
- if (!Writable.class.isAssignableFrom(clazz)) {
- throw new ClassNotFoundException(
- "Class " + classType + " does not implement Writable");
- }
- return of((Class<? extends Writable>) clazz);
- }
-
- private final Class<T> type;
-
- public WritableCoder(Class<T> type) {
- this.type = type;
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context) throws IOException {
- value.write(new DataOutputStream(outStream));
- }
-
- @Override
- public T decode(InputStream inStream, Context context) throws IOException {
- try {
- T t = type.newInstance();
- t.readFields(new DataInputStream(inStream));
- return t;
- } catch (InstantiationException | IllegalAccessException e) {
- throw new CoderException("unable to deserialize record", e);
- }
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return null;
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
- result.put("type", type.getName());
- return result;
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "Hadoop Writable may be non-deterministic.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java b/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java
deleted file mode 100644
index 72bd72a..0000000
--- a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.hadoop;
-
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.values.KV;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Tests for HadoopFileSource.
- */
-public class HadoopFileSourceTest {
-
- Random random = new Random(0L);
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Test
- public void testFullyReadSingleFile() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
- File file = createFileWithData("tmp.seq", expectedResults);
-
- HadoopFileSource<IntWritable, Text> source =
- HadoopFileSource.from(file.toString(), SequenceFileInputFormat.class,
- IntWritable.class, Text.class);
-
- assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
- assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
- }
-
- @Test
- public void testFullyReadFilePattern() throws IOException {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
- File file1 = createFileWithData("file1", data1);
-
- List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
- createFileWithData("file2", data2);
-
- List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
- createFileWithData("file3", data3);
-
- List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
- createFileWithData("otherfile", data4);
-
- HadoopFileSource<IntWritable, Text> source =
- HadoopFileSource.from(new File(file1.getParent(), "file*").toString(),
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
- expectedResults.addAll(data1);
- expectedResults.addAll(data2);
- expectedResults.addAll(data3);
- assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
- }
-
- @Test
- public void testCloseUnstartedFilePatternReader() throws IOException {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
- File file1 = createFileWithData("file1", data1);
-
- List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
- createFileWithData("file2", data2);
-
- List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
- createFileWithData("file3", data3);
-
- List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
- createFileWithData("otherfile", data4);
-
- HadoopFileSource<IntWritable, Text> source =
- HadoopFileSource.from(new File(file1.getParent(), "file*").toString(),
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
- // Closing an unstarted FilePatternReader should not throw an exception.
- try {
- reader.close();
- } catch (Exception e) {
- fail("Closing an unstarted FilePatternReader should not throw an exception");
- }
- }
-
- @Test
- public void testSplits() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
-
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
- File file = createFileWithData("tmp.avro", expectedResults);
-
- HadoopFileSource<IntWritable, Text> source =
- HadoopFileSource.from(file.toString(), SequenceFileInputFormat.class,
- IntWritable.class, Text.class);
-
- // Assert that the source produces the expected records
- assertEquals(expectedResults, readFromSource(source, options));
-
- // Split with a small bundle size (has to be at least size of sync interval)
- List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
- .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
- assertTrue(splits.size() > 2);
- SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
- int nonEmptySplits = 0;
- for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
- if (readFromSource(subSource, options).size() > 0) {
- nonEmptySplits += 1;
- }
- }
- assertTrue(nonEmptySplits > 2);
- }
-
- private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
- throws IOException {
- File tmpFile = tmpFolder.newFile(filename);
- try (Writer writer = SequenceFile.createWriter(new Configuration(),
- Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
- Writer.file(new Path(tmpFile.toURI())))) {
-
- for (KV<IntWritable, Text> record : records) {
- writer.append(record.getKey(), record.getValue());
- }
- }
- return tmpFile;
- }
-
- private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
- int numItems, int offset) {
- List<KV<IntWritable, Text>> records = new ArrayList<>();
- for (int i = 0; i < numItems; i++) {
- IntWritable key = new IntWritable(i + offset);
- Text value = new Text(createRandomString(dataItemLength));
- records.add(KV.of(key, value));
- }
- return records;
- }
-
- private String createRandomString(int length) {
- char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < length; i++) {
- builder.append(chars[random.nextInt(chars.length)]);
- }
- return builder.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java b/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java
deleted file mode 100644
index 368b682..0000000
--- a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.hadoop;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-
-import org.apache.hadoop.io.IntWritable;
-import org.junit.Test;
-
-/**
- * Tests for WritableCoder.
- */
-public class WritableCoderTest {
-
- @Test
- public void testIntWritableEncoding() throws Exception {
- IntWritable value = new IntWritable(42);
- WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
-
- CoderProperties.coderDecodeEncodeEqual(coder, value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/README.md b/sdks/java/io/hdfs/README.md
new file mode 100644
index 0000000..7149cda
--- /dev/null
+++ b/sdks/java/io/hdfs/README.md
@@ -0,0 +1,24 @@
+# HDFS IO
+
+This library provides HDFS sources and sinks to make it possible to read and
+write Apache Hadoop file formats from Apache Beam pipelines.
+
+Currently, only the read path is implemented. A `HDFSFileSource` allows any
+Hadoop `FileInputFormat` to be read as a `PCollection`.
+
+A `HDFSFileSource` can be read from using the
+`org.apache.beam.sdk.io.Read` transform. For example:
+
+```java
+HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
+ MyKey.class, MyValue.class);
+PCollection<KV<MyKey, MyValue>> records = Read.from(mySource);
+```
+
+Alternatively, the `readFrom` method is a convenience method that returns a read
+transform. For example:
+
+```java
+PCollection<KV<MyKey, MyValue>> records = HDFSFileSource.readFrom(path,
+ MyInputFormat.class, MyKey.class, MyValue.class);
+```
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
new file mode 100644
index 0000000..3eaef62
--- /dev/null
+++ b/sdks/java/io/hdfs/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>io-parent</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>hdfs</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: HDFS</name>
+ <description>Library to read and write Hadoop/HDFS file formats from Beam.</description>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-all</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.7.0</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
new file mode 100644
index 0000000..ab537eb
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+
+/**
+ * A {@code BoundedSource} for reading files resident in a Hadoop filesystem (HDFS) using a
+ * Hadoop file-based input format.
+ *
+ * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
+ * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
+ * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to
+ * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
+ * key class and the value class.
+ *
+ * <p>A {@code HDFSFileSource} can be read from using the
+ * {@link org.apache.beam.sdk.io.Read} transform. For example:
+ *
+ * <pre>
+ * {@code
+ * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
+ * MyKey.class, MyValue.class);
+ * PCollection<KV<MyKey, MyValue>> records = Read.from(mySource);
+ * }
+ * </pre>
+ *
+ * <p>The {@link HDFSFileSource#readFrom} method is a convenience method
+ * that returns a read transform. For example:
+ *
+ * <pre>
+ * {@code
+ * PCollection<KV<MyKey, MyValue>> records = HDFSFileSource.readFrom(path,
+ * MyInputFormat.class, MyKey.class, MyValue.class);
+ * }
+ * </pre>
+ *
+ * Implementation note: Since Hadoop's {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
+ * determines the input splits, this class extends {@link BoundedSource} rather than
+ * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
+ * dictates input splits.
+
+ * @param <K> The type of keys to be read from the source.
+ * @param <V> The type of values to be read from the source.
+ */
+public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
+ private static final long serialVersionUID = 0L;
+
+ private final String filepattern;
+ private final Class<? extends FileInputFormat<?, ?>> formatClass;
+ private final Class<K> keyClass;
+ private final Class<V> valueClass;
+ private final SerializableSplit serializableSplit;
+
+ /**
+ * Creates a {@code Read} transform that will read from an {@code HDFSFileSource}
+ * with the given file name or pattern ("glob") using the given Hadoop
+ * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
+ * with key-value types specified by the given key class and value class.
+ */
+ public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K, V>> readFrom(
+ String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) {
+ return Read.from(from(filepattern, formatClass, keyClass, valueClass));
+ }
+
+ /**
+ * Creates a {@code HDFSFileSource} that reads from the given file name or pattern ("glob")
+ * using the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
+ * with key-value types specified by the given key class and value class.
+ */
+ public static <K, V, T extends FileInputFormat<K, V>> HDFSFileSource<K, V> from(
+ String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) {
+ @SuppressWarnings("unchecked")
+ HDFSFileSource<K, V> source = (HDFSFileSource<K, V>)
+ new HDFSFileSource(filepattern, formatClass, keyClass, valueClass);
+ return source;
+ }
+
+ /**
+ * Create a {@code HDFSFileSource} based on a file or a file pattern specification.
+ */
+ private HDFSFileSource(String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
+ Class<V> valueClass) {
+ this(filepattern, formatClass, keyClass, valueClass, null);
+ }
+
+ /**
+ * Create a {@code HDFSFileSource} based on a single Hadoop input split, which won't be
+ * split up further.
+ */
+ private HDFSFileSource(String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
+ Class<V> valueClass, SerializableSplit serializableSplit) {
+ this.filepattern = filepattern;
+ this.formatClass = formatClass;
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ this.serializableSplit = serializableSplit;
+ }
+
+ public String getFilepattern() {
+ return filepattern;
+ }
+
+ public Class<? extends FileInputFormat<?, ?>> getFormatClass() {
+ return formatClass;
+ }
+
+ public Class<K> getKeyClass() {
+ return keyClass;
+ }
+
+ public Class<V> getValueClass() {
+ return valueClass;
+ }
+
+ @Override
+ public void validate() {
+ Preconditions.checkNotNull(filepattern,
+ "need to set the filepattern of a HDFSFileSource");
+ Preconditions.checkNotNull(formatClass,
+ "need to set the format class of a HDFSFileSource");
+ Preconditions.checkNotNull(keyClass,
+ "need to set the key class of a HDFSFileSource");
+ Preconditions.checkNotNull(valueClass,
+ "need to set the value class of a HDFSFileSource");
+ }
+
+ @Override
+ public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes,
+ PipelineOptions options) throws Exception {
+ if (serializableSplit == null) {
+ return Lists.transform(computeSplits(desiredBundleSizeBytes),
+ new Function<InputSplit, BoundedSource<KV<K, V>>>() {
+ @Nullable @Override
+ public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
+ return new HDFSFileSource<K, V>(filepattern, formatClass, keyClass,
+ valueClass, new SerializableSplit(inputSplit));
+ }
+ });
+ } else {
+ return ImmutableList.of(this);
+ }
+ }
+
+ private FileInputFormat<?, ?> createFormat(Job job) throws IOException, IllegalAccessException,
+ InstantiationException {
+ Path path = new Path(filepattern);
+ FileInputFormat.addInputPath(job, path);
+ return formatClass.newInstance();
+ }
+
+ private List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException,
+ IllegalAccessException, InstantiationException {
+ Job job = Job.getInstance();
+ FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
+ FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
+ return createFormat(job).getSplits(job);
+ }
+
+ @Override
+ public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException {
+ this.validate();
+
+ if (serializableSplit == null) {
+ return new HDFSFileReader<>(this, filepattern, formatClass);
+ } else {
+ return new HDFSFileReader<>(this, filepattern, formatClass,
+ serializableSplit.getSplit());
+ }
+ }
+
+ @Override
+ public Coder<KV<K, V>> getDefaultOutputCoder() {
+ return KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> Coder<T> getDefaultCoder(Class<T> c) {
+ if (Writable.class.isAssignableFrom(c)) {
+ Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
+ return (Coder<T>) WritableCoder.of(writableClass);
+ } else if (Void.class.equals(c)) {
+ return (Coder<T>) VoidCoder.of();
+ }
+ // TODO: how to use registered coders here?
+ throw new IllegalStateException("Cannot find coder for " + c);
+ }
+
+ // BoundedSource
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) {
+ long size = 0;
+ try {
+ Job job = Job.getInstance(); // new instance
+ for (FileStatus st : listStatus(createFormat(job), job)) {
+ size += st.getLen();
+ }
+ } catch (IOException | NoSuchMethodException | InvocationTargetException
+ | IllegalAccessException | InstantiationException e) {
+ // ignore, and return 0
+ }
+ return size;
+ }
+
+ private <K, V> List<FileStatus> listStatus(FileInputFormat<K, V> format,
+ JobContext jobContext) throws NoSuchMethodException, InvocationTargetException,
+ IllegalAccessException {
+ // FileInputFormat#listStatus is protected, so call using reflection
+ Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
+ listStatus.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, jobContext);
+ return stat;
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return false;
+ }
+
+ static class HDFSFileReader<K, V> extends BoundedSource.BoundedReader<KV<K, V>> {
+
+ private final BoundedSource<KV<K, V>> source;
+ private final String filepattern;
+ private final Class formatClass;
+
+ private FileInputFormat<?, ?> format;
+ private TaskAttemptContext attemptContext;
+ private List<InputSplit> splits;
+ private ListIterator<InputSplit> splitsIterator;
+ private Configuration conf;
+ private RecordReader<K, V> currentReader;
+ private KV<K, V> currentPair;
+
+ /**
+ * Create a {@code HDFSFileReader} based on a file or a file pattern specification.
+ */
+ public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass) {
+ this(source, filepattern, formatClass, null);
+ }
+
+ /**
+ * Create a {@code HDFSFileReader} based on a single Hadoop input split.
+ */
+ public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split) {
+ this.source = source;
+ this.filepattern = filepattern;
+ this.formatClass = formatClass;
+ if (split != null) {
+ this.splits = ImmutableList.of(split);
+ this.splitsIterator = splits.listIterator();
+ }
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ Job job = Job.getInstance(); // new instance
+ Path path = new Path(filepattern);
+ FileInputFormat.addInputPath(job, path);
+
+ try {
+ @SuppressWarnings("unchecked")
+ FileInputFormat<K, V> f = (FileInputFormat<K, V>) formatClass.newInstance();
+ this.format = f;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IOException("Cannot instantiate file input format " + formatClass, e);
+ }
+ this.attemptContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ new TaskAttemptID());
+
+ if (splitsIterator == null) {
+ this.splits = format.getSplits(job);
+ this.splitsIterator = splits.listIterator();
+ }
+ this.conf = job.getConfiguration();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ try {
+ if (currentReader != null && currentReader.nextKeyValue()) {
+ currentPair = nextPair();
+ return true;
+ } else {
+ while (splitsIterator.hasNext()) {
+ // advance the reader and see if it has records
+ InputSplit nextSplit = splitsIterator.next();
+ @SuppressWarnings("unchecked")
+ RecordReader<K, V> reader =
+ (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
+ if (currentReader != null) {
+ currentReader.close();
+ }
+ currentReader = reader;
+ currentReader.initialize(nextSplit, attemptContext);
+ if (currentReader.nextKeyValue()) {
+ currentPair = nextPair();
+ return true;
+ }
+ currentReader.close();
+ currentReader = null;
+ }
+ // either no next split or all readers were empty
+ currentPair = null;
+ return false;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private KV<K, V> nextPair() throws IOException, InterruptedException {
+ K key = currentReader.getCurrentKey();
+ V value = currentReader.getCurrentValue();
+ // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
+ if (key instanceof Writable) {
+ key = (K) WritableUtils.clone((Writable) key, conf);
+ }
+ if (value instanceof Writable) {
+ value = (V) WritableUtils.clone((Writable) value, conf);
+ }
+ return KV.of(key, value);
+ }
+
+ @Override
+ public KV<K, V> getCurrent() throws NoSuchElementException {
+ if (currentPair == null) {
+ throw new NoSuchElementException();
+ }
+ return currentPair;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ currentPair = null;
+ }
+
+ @Override
+ public BoundedSource<KV<K, V>> getCurrentSource() {
+ return source;
+ }
+
+ // BoundedReader
+
+ @Override
+ public Double getFractionConsumed() {
+ if (currentReader == null) {
+ return 0.0;
+ }
+ if (splits.isEmpty()) {
+ return 1.0;
+ }
+ int index = splitsIterator.previousIndex();
+ int numReaders = splits.size();
+ if (index == numReaders) {
+ return 1.0;
+ }
+ double before = 1.0 * index / numReaders;
+ double after = 1.0 * (index + 1) / numReaders;
+ Double fractionOfCurrentReader = getProgress();
+ if (fractionOfCurrentReader == null) {
+ return before;
+ }
+ return before + fractionOfCurrentReader * (after - before);
+ }
+
+ private Double getProgress() {
+ try {
+ return (double) currentReader.getProgress();
+ } catch (IOException | InterruptedException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public BoundedSource<KV<K, V>> splitAtFraction(double fraction) {
+ // Not yet supported. To implement this, the sizes of the splits should be used to
+ // calculate the remaining splits that constitute the given fraction, then a
+ // new source backed by those splits should be returned.
+ return null;
+ }
+ }
+
+ /**
+ * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
+ * serialized using Java's standard serialization mechanisms. Note that the InputSplit
+ * has to be Writable (which most are).
+ */
+ public static class SerializableSplit implements Externalizable {
+ private static final long serialVersionUID = 0L;
+
+ private InputSplit split;
+
+ public SerializableSplit() {
+ }
+
+ public SerializableSplit(InputSplit split) {
+ Preconditions.checkArgument(split instanceof Writable, "Split is not writable: "
+ + split);
+ this.split = split;
+ }
+
+ public InputSplit getSplit() {
+ return split;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(split.getClass().getCanonicalName());
+ ((Writable) split).write(out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ String className = in.readUTF();
+ try {
+ split = (InputSplit) Class.forName(className).newInstance();
+ ((Writable) split).readFields(in);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
new file mode 100644
index 0000000..814a762
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.io.Writable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * A {@code WritableCoder} is a {@link org.apache.beam.sdk.coders.Coder} for a
+ * Java class that implements {@link org.apache.hadoop.io.Writable}.
+ *
+ * <p> To use, specify the coder type on a PCollection:
+ * <pre>
+ * {@code
+ * PCollection<MyRecord> records =
+ * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
+ * }
+ * </pre>
+ *
+ * @param <T> the type of elements handled by this coder
+ */
+public class WritableCoder<T extends Writable> extends StandardCoder<T> {
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Returns a {@code WritableCoder} instance for the provided element class.
+ * @param <T> the element type
+ */
+ public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
+ return new WritableCoder<>(clazz);
+ }
+
+ @JsonCreator
+ @SuppressWarnings("unchecked")
+ public static WritableCoder<?> of(@JsonProperty("type") String classType)
+ throws ClassNotFoundException {
+ Class<?> clazz = Class.forName(classType);
+ if (!Writable.class.isAssignableFrom(clazz)) {
+ throw new ClassNotFoundException(
+ "Class " + classType + " does not implement Writable");
+ }
+ return of((Class<? extends Writable>) clazz);
+ }
+
+ private final Class<T> type;
+
+ public WritableCoder(Class<T> type) {
+ this.type = type;
+ }
+
+ @Override
+ public void encode(T value, OutputStream outStream, Context context) throws IOException {
+ value.write(new DataOutputStream(outStream));
+ }
+
+ @Override
+ public T decode(InputStream inStream, Context context) throws IOException {
+ try {
+ T t = type.newInstance();
+ t.readFields(new DataInputStream(inStream));
+ return t;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new CoderException("unable to deserialize record", e);
+ }
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return null;
+ }
+
+ @Override
+ public CloudObject asCloudObject() {
+ CloudObject result = super.asCloudObject();
+ result.put("type", type.getName());
+ return result;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "Hadoop Writable may be non-deterministic.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
new file mode 100644
index 0000000..67df7bc
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.values.KV;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Tests for HDFSFileSource.
+ */
+public class HDFSFileSourceTest {
+
+ Random random = new Random(0L);
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testFullyReadSingleFile() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
+ File file = createFileWithData("tmp.seq", expectedResults);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+ IntWritable.class, Text.class);
+
+ assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+
+ assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+ }
+
+ @Test
+ public void testFullyReadFilePattern() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+ File file1 = createFileWithData("file1", data1);
+
+ List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+ createFileWithData("file2", data2);
+
+ List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+ createFileWithData("file3", data3);
+
+ List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+ createFileWithData("otherfile", data4);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
+ expectedResults.addAll(data1);
+ expectedResults.addAll(data2);
+ expectedResults.addAll(data3);
+ assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+ }
+
+ @Test
+ public void testCloseUnstartedFilePatternReader() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+ File file1 = createFileWithData("file1", data1);
+
+ List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+ createFileWithData("file2", data2);
+
+ List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+ createFileWithData("file3", data3);
+
+ List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+ createFileWithData("otherfile", data4);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
+ // Closing an unstarted FilePatternReader should not throw an exception.
+ try {
+ reader.close();
+ } catch (Exception e) {
+ fail("Closing an unstarted FilePatternReader should not throw an exception");
+ }
+ }
+
+ @Test
+ public void testSplits() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
+ File file = createFileWithData("tmp.avro", expectedResults);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+ IntWritable.class, Text.class);
+
+ // Assert that the source produces the expected records
+ assertEquals(expectedResults, readFromSource(source, options));
+
+ // Split with a small bundle size (has to be at least size of sync interval)
+ List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
+ .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
+ assertTrue(splits.size() > 2);
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+ int nonEmptySplits = 0;
+ for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
+ if (readFromSource(subSource, options).size() > 0) {
+ nonEmptySplits += 1;
+ }
+ }
+ assertTrue(nonEmptySplits > 2);
+ }
+
+ private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
+ throws IOException {
+ File tmpFile = tmpFolder.newFile(filename);
+ try (Writer writer = SequenceFile.createWriter(new Configuration(),
+ Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+ Writer.file(new Path(tmpFile.toURI())))) {
+
+ for (KV<IntWritable, Text> record : records) {
+ writer.append(record.getKey(), record.getValue());
+ }
+ }
+ return tmpFile;
+ }
+
+ private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
+ int numItems, int offset) {
+ List<KV<IntWritable, Text>> records = new ArrayList<>();
+ for (int i = 0; i < numItems; i++) {
+ IntWritable key = new IntWritable(i + offset);
+ Text value = new Text(createRandomString(dataItemLength));
+ records.add(KV.of(key, value));
+ }
+ return records;
+ }
+
+ private String createRandomString(int length) {
+ char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ builder.append(chars[random.nextInt(chars.length)]);
+ }
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
new file mode 100644
index 0000000..715da8e
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Test;
+
+/**
+ * Tests for WritableCoder.
+ */
+public class WritableCoderTest {
+
+ @Test
+ public void testIntWritableEncoding() throws Exception {
+ IntWritable value = new IntWritable(42);
+ WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
+
+ CoderProperties.coderDecodeEncodeEqual(coder, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
new file mode 100644
index 0000000..75f192c
--- /dev/null
+++ b/sdks/java/io/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-parent</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>io-parent</artifactId>
+ <packaging>pom</packaging>
+ <name>Apache Beam :: SDKs :: Java :: IO</name>
+ <description>Beam SDK Java IO provides different connectivity components
+ (sources and sinks) to consume and produce data from systems.</description>
+
+ <modules>
+ <module>hdfs</module>
+ </modules>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 1111d92..6bd7ee7 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -36,6 +36,7 @@
<modules>
<module>core</module>
+ <module>io</module>
<!-- sdks/java/maven-archtypes has several dependencies on the
DataflowPipelineRunner. Until these are refactored out or
a released artifact exists, we need to modify the build order.
[2/2] incubator-beam git commit: This closes #96
Posted by da...@apache.org.
This closes #96
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8ed3980
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8ed3980
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8ed3980
Branch: refs/heads/master
Commit: c8ed3980687f22c0d2342ed7f7117b9e6550e1c6
Parents: bcefff6 404b633
Author: Davor Bonaci <da...@google.com>
Authored: Fri Apr 15 10:28:53 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Apr 15 10:28:53 2016 -0700
----------------------------------------------------------------------
contrib/hadoop/README.md | 24 -
contrib/hadoop/pom.xml | 170 -------
.../apache/contrib/hadoop/HadoopFileSource.java | 486 -------------------
.../apache/contrib/hadoop/WritableCoder.java | 111 -----
.../contrib/hadoop/HadoopFileSourceTest.java | 190 --------
.../contrib/hadoop/WritableCoderTest.java | 37 --
sdks/java/io/hdfs/README.md | 24 +
sdks/java/io/hdfs/pom.xml | 65 +++
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 486 +++++++++++++++++++
.../apache/beam/sdk/io/hdfs/WritableCoder.java | 111 +++++
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 190 ++++++++
.../beam/sdk/io/hdfs/WritableCoderTest.java | 37 ++
sdks/java/io/pom.xml | 41 ++
sdks/java/pom.xml | 1 +
14 files changed, 955 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------