You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/05 16:57:22 UTC
[1/3] beam git commit: [BEAM-2016] Delete HdfsFileSource & Sink
Repository: beam
Updated Branches:
refs/heads/master 3bffe0e00 -> 610bda168
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
deleted file mode 100644
index 9fa6606..0000000
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.base.MoreObjects;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Tests for HDFSFileSinkTest.
- */
-public class HDFSFileSinkTest {
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- private final String part0 = "part-r-00000";
- private final String foobar = "foobar";
-
- private <T> void doWrite(Sink<T> sink,
- PipelineOptions options,
- Iterable<T> toWrite) throws Exception {
- Sink.WriteOperation<T, String> writeOperation =
- (Sink.WriteOperation<T, String>) sink.createWriteOperation();
- Sink.Writer<T, String> writer = writeOperation.createWriter(options);
- writer.openUnwindowed(UUID.randomUUID().toString(), -1, -1);
- for (T t: toWrite) {
- writer.write(t);
- }
- String writeResult = writer.close();
- writeOperation.finalize(Collections.singletonList(writeResult), options);
- }
-
- @Test
- public void testWriteSingleRecord() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- File file = tmpFolder.newFolder();
-
- HDFSFileSink<String, NullWritable, Text> sink =
- HDFSFileSink.to(
- file.toString(),
- SequenceFileOutputFormat.class,
- NullWritable.class,
- Text.class,
- new SerializableFunction<String, KV<NullWritable, Text>>() {
- @Override
- public KV<NullWritable, Text> apply(String input) {
- return KV.of(NullWritable.get(), new Text(input));
- }
- });
-
- doWrite(sink, options, Collections.singletonList(foobar));
-
- SequenceFile.Reader.Option opts =
- SequenceFile.Reader.file(new Path(file.toString(), part0));
- SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), opts);
- assertEquals(NullWritable.class.getName(), reader.getKeyClassName());
- assertEquals(Text.class.getName(), reader.getValueClassName());
- NullWritable k = NullWritable.get();
- Text v = new Text();
- assertEquals(true, reader.next(k, v));
- assertEquals(NullWritable.get(), k);
- assertEquals(new Text(foobar), v);
- }
-
- @Test
- public void testToText() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- File file = tmpFolder.newFolder();
-
- HDFSFileSink<String, NullWritable, Text> sink = HDFSFileSink.toText(file.toString());
-
- doWrite(sink, options, Collections.singletonList(foobar));
-
- List<String> strings = Files.readAllLines(new File(file.toString(), part0).toPath(),
- Charset.forName("UTF-8"));
- assertEquals(Collections.singletonList(foobar), strings);
- }
-
- @DefaultCoder(AvroCoder.class)
- static class GenericClass {
- int intField;
- String stringField;
- public GenericClass() {}
- public GenericClass(int intValue, String stringValue) {
- this.intField = intValue;
- this.stringField = stringValue;
- }
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("intField", intField)
- .add("stringField", stringField)
- .toString();
- }
- @Override
- public int hashCode() {
- return Objects.hash(intField, stringField);
- }
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof GenericClass)) {
- return false;
- }
- GenericClass o = (GenericClass) other;
- return Objects.equals(intField, o.intField) && Objects.equals(stringField, o.stringField);
- }
- }
-
- @Test
- public void testToAvro() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- File file = tmpFolder.newFolder();
-
- HDFSFileSink<GenericClass, AvroKey<GenericClass>, NullWritable> sink = HDFSFileSink.toAvro(
- file.toString(),
- AvroCoder.of(GenericClass.class),
- new Configuration(false));
-
- doWrite(sink, options, Collections.singletonList(new GenericClass(3, "foobar")));
-
- GenericDatumReader datumReader = new GenericDatumReader();
- FileReader<GenericData.Record> reader =
- DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + ".avro"), datumReader);
- GenericData.Record next = reader.next(null);
- assertEquals("foobar", next.get("stringField").toString());
- assertEquals(3, next.get("intField"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
deleted file mode 100644
index a964239..0000000
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Tests for HDFSFileSource.
- */
-public class HDFSFileSourceTest {
-
- private Random random = new Random(0L);
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Test
- public void testFullyReadSingleFile() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
- File file = createFileWithData("tmp.seq", expectedResults);
-
- HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
- HDFSFileSource.from(
- file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
- assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
- assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
- }
-
- @Test
- public void testFullyReadSingleFileWithSpaces() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
- File file = createFileWithData("tmp data.seq", expectedResults);
-
- HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
- HDFSFileSource.from(
- file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
- assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
- assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
- }
-
- @Test
- public void testFullyReadFilePattern() throws IOException {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
- File file1 = createFileWithData("file1", data1);
-
- List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
- createFileWithData("file2", data2);
-
- List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
- createFileWithData("file3", data3);
-
- List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
- createFileWithData("otherfile", data4);
-
- List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
- expectedResults.addAll(data1);
- expectedResults.addAll(data2);
- expectedResults.addAll(data3);
-
- HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
- HDFSFileSource.from(
- new File(file1.getParent(), "file*").toString(), SequenceFileInputFormat.class,
- IntWritable.class, Text.class);
-
- assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
- }
-
- @Test
- public void testCloseUnstartedFilePatternReader() throws IOException {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
- File file1 = createFileWithData("file1", data1);
-
- List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
- createFileWithData("file2", data2);
-
- List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
- createFileWithData("file3", data3);
-
- List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
- createFileWithData("otherfile", data4);
-
- HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
- HDFSFileSource.from(
- new File(file1.getParent(), "file*").toString(),
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
-
- // Closing an unstarted FilePatternReader should not throw an exception.
- try {
- reader.close();
- } catch (Exception e) {
- fail("Closing an unstarted FilePatternReader should not throw an exception");
- }
- }
-
- @Test
- public void testSplits() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
-
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
- File file = createFileWithData("tmp.seq", expectedResults);
-
- HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
- HDFSFileSource.from(
- file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
- // Assert that the source produces the expected records
- assertEquals(expectedResults, readFromSource(source, options));
-
- // Split with a small bundle size (has to be at least size of sync interval)
- List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
- .split(SequenceFile.SYNC_INTERVAL, options);
- assertTrue(splits.size() > 2);
- SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
- int nonEmptySplits = 0;
- for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
- if (readFromSource(subSource, options).size() > 0) {
- nonEmptySplits += 1;
- }
- }
- assertTrue(nonEmptySplits > 2);
- }
-
- @Test
- public void testSplitEstimatedSize() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
-
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
- File file = createFileWithData("tmp.avro", expectedResults);
-
- HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
- HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
- IntWritable.class, Text.class);
-
- long originalSize = source.getEstimatedSizeBytes(options);
- long splitTotalSize = 0;
- List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split(
- SequenceFile.SYNC_INTERVAL, options
- );
- for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
- splitTotalSize += splitSource.getEstimatedSizeBytes(options);
- }
- // Assert that the estimated size of the whole is the sum of its parts
- assertEquals(originalSize, splitTotalSize);
- }
-
- private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
- throws IOException {
- File tmpFile = tmpFolder.newFile(filename);
- try (Writer writer = SequenceFile.createWriter(new Configuration(),
- Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
- Writer.file(new Path(tmpFile.toURI())))) {
-
- for (KV<IntWritable, Text> record : records) {
- writer.append(record.getKey(), record.getValue());
- }
- }
- return tmpFile;
- }
-
- private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
- int numItems, int offset) {
- List<KV<IntWritable, Text>> records = new ArrayList<>();
- for (int i = 0; i < numItems; i++) {
- IntWritable key = new IntWritable(i + offset);
- Text value = new Text(createRandomString(dataItemLength));
- records.add(KV.of(key, value));
- }
- return records;
- }
-
- private String createRandomString(int length) {
- char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < length; i++) {
- builder.append(chars[random.nextInt(chars.length)]);
- }
- return builder.toString();
- }
-
-}
[3/3] beam git commit: This closes #2908
Posted by dh...@apache.org.
This closes #2908
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/610bda16
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/610bda16
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/610bda16
Branch: refs/heads/master
Commit: 610bda1682d132a2fad958743ab173cbfab085cf
Parents: 3bffe0e 7512a73
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 5 09:57:15 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 5 09:57:15 2017 -0700
----------------------------------------------------------------------
sdks/java/io/hadoop-file-system/README.md | 43 --
sdks/java/io/hadoop-file-system/pom.xml | 24 -
.../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 478 --------------
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 -------------------
.../java/org/apache/beam/sdk/io/hdfs/Sink.java | 195 ------
.../org/apache/beam/sdk/io/hdfs/UGIHelper.java | 38 --
.../java/org/apache/beam/sdk/io/hdfs/Write.java | 588 -----------------
.../apache/beam/sdk/io/hdfs/package-info.java | 3 +-
.../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 172 -----
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 231 -------
10 files changed, 2 insertions(+), 2395 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: [BEAM-2016] Delete HdfsFileSource & Sink
Posted by dh...@apache.org.
[BEAM-2016] Delete HdfsFileSource & Sink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7512a73c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7512a73c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7512a73c
Branch: refs/heads/master
Commit: 7512a73cf8aa2a527c89ecb054e92207411ed241
Parents: 3bffe0e
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 4 18:33:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 5 08:48:04 2017 -0700
----------------------------------------------------------------------
sdks/java/io/hadoop-file-system/README.md | 43 --
sdks/java/io/hadoop-file-system/pom.xml | 24 -
.../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 478 --------------
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 -------------------
.../java/org/apache/beam/sdk/io/hdfs/Sink.java | 195 ------
.../org/apache/beam/sdk/io/hdfs/UGIHelper.java | 38 --
.../java/org/apache/beam/sdk/io/hdfs/Write.java | 588 -----------------
.../apache/beam/sdk/io/hdfs/package-info.java | 3 +-
.../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 172 -----
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 231 -------
10 files changed, 2 insertions(+), 2395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/README.md b/sdks/java/io/hadoop-file-system/README.md
deleted file mode 100644
index 3a734f2..0000000
--- a/sdks/java/io/hadoop-file-system/README.md
+++ /dev/null
@@ -1,43 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-# HDFS IO
-
-This library provides HDFS sources and sinks to make it possible to read and
-write Apache Hadoop file formats from Apache Beam pipelines.
-
-Currently, only the read path is implemented. A `HDFSFileSource` allows any
-Hadoop `FileInputFormat` to be read as a `PCollection`.
-
-A `HDFSFileSource` can be read from using the
-`org.apache.beam.sdk.io.Read` transform. For example:
-
-```java
-HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
- MyKey.class, MyValue.class);
-PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
-```
-
-Alternatively, the `readFrom` method is a convenience method that returns a read
-transform. For example:
-
-```java
-PCollection<KV<MyKey, MyValue>> records = pipeline.apply(HDFSFileSource.readFrom(path,
- MyInputFormat.class, MyKey.class, MyValue.class));
-```
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index 562277e..3b392c2 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -82,11 +82,6 @@
</dependency>
<dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
@@ -124,25 +119,6 @@
</dependency>
<dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <version>${avro.version}</version>
- <classifier>hadoop2</classifier>
- <exclusions>
- <!-- exclude old Jetty version of servlet API -->
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
deleted file mode 100644
index aee73c4..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.Random;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based
- * output
- * format.
- *
- * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop
- * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported
- * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the
- * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K
- * and V.
- *
- * <p>{@code HDFSFileSink} can be used by {@link Write} to create write
- * transform. See example below.
- *
- * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, NullWritable> sink =
- * HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class));
- * avroRecordsPCollection.apply(Write.to(sink));
- * }
- * </pre>
- *
- * @param <T> the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be written to the sink via {@link FileOutputFormat}.
- * @param <V> the type of values to be written to the sink via {@link FileOutputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
-
- private static final JobID jobId = new JobID(
- Long.toString(System.currentTimeMillis()),
- new Random().nextInt(Integer.MAX_VALUE));
-
- public abstract String path();
- public abstract Class<? extends FileOutputFormat<K, V>> formatClass();
- public abstract Class<K> keyClass();
- public abstract Class<V> valueClass();
- public abstract SerializableFunction<T, KV<K, V>> outputConverter();
- public abstract SerializableConfiguration serializableConfiguration();
- public @Nullable abstract String username();
- public abstract boolean validate();
-
- // =======================================================================
- // Factory methods
- // =======================================================================
-
- public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, V>
- to(String path,
- Class<W> formatClass,
- Class<K> keyClass,
- Class<V> valueClass,
- SerializableFunction<T, KV<K, V>> outputConverter) {
- return HDFSFileSink.<T, K, V>builder()
- .setPath(path)
- .setFormatClass(formatClass)
- .setKeyClass(keyClass)
- .setValueClass(valueClass)
- .setOutputConverter(outputConverter)
- .setConfiguration(null)
- .setUsername(null)
- .setValidate(true)
- .build();
- }
-
- public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) {
- SerializableFunction<T, KV<NullWritable, Text>> outputConverter =
- new SerializableFunction<T, KV<NullWritable, Text>>() {
- @Override
- public KV<NullWritable, Text> apply(T input) {
- return KV.of(NullWritable.get(), new Text(input.toString()));
- }
- };
- return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter);
- }
-
- /**
- * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration
- * object is altered to enable Avro output.
- */
- public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
- final AvroCoder<T> coder,
- Configuration conf) {
- SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter =
- new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() {
- @Override
- public KV<AvroKey<T>, NullWritable> apply(T input) {
- return KV.of(new AvroKey<>(input), NullWritable.get());
- }
- };
- conf.set("avro.schema.output.key", coder.getSchema().toString());
- return to(
- path,
- AvroKeyOutputFormat.class,
- (Class<AvroKey<T>>) (Class<?>) AvroKey.class,
- NullWritable.class,
- outputConverter).withConfiguration(conf);
- }
-
- /**
- * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration
- * object is altered to enable Avro output.
- */
- public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, NullWritable>
- toAvro(String path, Schema schema, Configuration conf) {
- return toAvro(path, AvroCoder.of(schema), conf);
- }
-
- /**
- * Helper to create Avro sink given {@link Class}. Keep in mind that configuration
- * object is altered to enable Avro output.
- */
- public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
- Class<T> cls,
- Configuration conf) {
- return toAvro(path, AvroCoder.of(cls), conf);
- }
-
- // =======================================================================
- // Builder methods
- // =======================================================================
-
- public abstract Builder<T, K, V> toBuilder();
- public static <T, K, V> Builder builder() {
- return new AutoValue_HDFSFileSink.Builder<>();
- }
-
- /**
- * AutoValue builder for {@link HDFSFileSink}.
- */
- @AutoValue.Builder
- public abstract static class Builder<T, K, V> {
- public abstract Builder<T, K, V> setPath(String path);
- public abstract Builder<T, K, V> setFormatClass(
- Class<? extends FileOutputFormat<K, V>> formatClass);
- public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass);
- public abstract Builder<T, K, V> setValueClass(Class<V> valueClass);
- public abstract Builder<T, K, V> setOutputConverter(
- SerializableFunction<T, KV<K, V>> outputConverter);
- public abstract Builder<T, K, V> setSerializableConfiguration(
- SerializableConfiguration serializableConfiguration);
- public Builder<T, K, V> setConfiguration(@Nullable Configuration configuration) {
- if (configuration == null) {
- configuration = new Configuration(false);
- }
- return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
- }
- public abstract Builder<T, K, V> setUsername(String username);
- public abstract Builder<T, K, V> setValidate(boolean validate);
- public abstract HDFSFileSink<T, K, V> build();
- }
-
- public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration configuration) {
- return this.toBuilder().setConfiguration(configuration).build();
- }
-
- public HDFSFileSink<T, K, V> withUsername(@Nullable String username) {
- return this.toBuilder().setUsername(username).build();
- }
-
- // =======================================================================
- // Sink
- // =======================================================================
-
- @Override
- public void validate(PipelineOptions options) {
- if (validate()) {
- try {
- UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- FileSystem fs = FileSystem.get(new URI(path()),
- SerializableConfiguration.newConfiguration(serializableConfiguration()));
- checkState(!fs.exists(new Path(path())), "Output path %s already exists", path());
- return null;
- }
- });
- } catch (IOException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public Sink.WriteOperation<T, String> createWriteOperation() {
- return new HDFSWriteOperation<>(this, path(), formatClass());
- }
-
- private Job newJob() throws IOException {
- Job job = SerializableConfiguration.newJob(serializableConfiguration());
- job.setJobID(jobId);
- job.setOutputKeyClass(keyClass());
- job.setOutputValueClass(valueClass());
- return job;
- }
-
- // =======================================================================
- // WriteOperation
- // =======================================================================
-
- /** {{@link WriteOperation}} for HDFS. */
- private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, String> {
-
- private final HDFSFileSink<T, K, V> sink;
- private final String path;
- private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
- HDFSWriteOperation(HDFSFileSink<T, K, V> sink,
- String path,
- Class<? extends FileOutputFormat<K, V>> formatClass) {
- this.sink = sink;
- this.path = path;
- this.formatClass = formatClass;
- }
-
- @Override
- public void initialize(PipelineOptions options) throws Exception {
- Job job = sink.newJob();
- FileOutputFormat.setOutputPath(job, new Path(path));
- }
-
- @Override
- public void setWindowedWrites(boolean windowedWrites) {
- }
-
- @Override
- public void finalize(final Iterable<String> writerResults, PipelineOptions options)
- throws Exception {
- UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- doFinalize(writerResults);
- return null;
- }
- });
- }
-
- private void doFinalize(Iterable<String> writerResults) throws Exception {
- Job job = sink.newJob();
- FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
- // If there are 0 output shards, just create output folder.
- if (!writerResults.iterator().hasNext()) {
- fs.mkdirs(new Path(path));
- return;
- }
-
- // job successful
- JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
- FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context);
- outputCommitter.commitJob(context);
-
- // get actual output shards
- Set<String> actual = Sets.newHashSet();
- FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
- @Override
- public boolean accept(Path path) {
- String name = path.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- });
-
- // get expected output shards
- Set<String> expected = Sets.newHashSet(writerResults);
- checkState(
- expected.size() == Lists.newArrayList(writerResults).size(),
- "Data loss due to writer results hash collision");
- for (FileStatus s : statuses) {
- String name = s.getPath().getName();
- int pos = name.indexOf('.');
- actual.add(pos > 0 ? name.substring(0, pos) : name);
- }
-
- checkState(actual.equals(expected), "Writer results and output files do not match");
-
- // rename output shards to Hadoop style, i.e. part-r-00000.txt
- int i = 0;
- for (FileStatus s : statuses) {
- String name = s.getPath().getName();
- int pos = name.indexOf('.');
- String ext = pos > 0 ? name.substring(pos) : "";
- fs.rename(
- s.getPath(),
- new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext)));
- i++;
- }
- }
-
- @Override
- public Writer<T, String> createWriter(PipelineOptions options) throws Exception {
- return new HDFSWriter<>(this, path, formatClass);
- }
-
- @Override
- public Sink<T> getSink() {
- return sink;
- }
-
- @Override
- public Coder<String> getWriterResultCoder() {
- return StringUtf8Coder.of();
- }
-
- }
-
- // =======================================================================
- // Writer
- // =======================================================================
-
- private static class HDFSWriter<T, K, V> extends Writer<T, String> {
-
- private final HDFSWriteOperation<T, K, V> writeOperation;
- private final String path;
- private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
- // unique hash for each task
- private int hash;
-
- private TaskAttemptContext context;
- private RecordWriter<K, V> recordWriter;
- private FileOutputCommitter outputCommitter;
-
- HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation,
- String path,
- Class<? extends FileOutputFormat<K, V>> formatClass) {
- this.writeOperation = writeOperation;
- this.path = path;
- this.formatClass = formatClass;
- }
-
- @Override
- public void openWindowed(final String uId,
- BoundedWindow window,
- PaneInfo paneInfo,
- int shard,
- int numShards) throws Exception {
- throw new UnsupportedOperationException("Windowing support not implemented yet for"
- + "HDFS. Window " + window);
- }
-
- @Override
- public void openUnwindowed(final String uId, int shard, int numShards) throws Exception {
- UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- doOpen(uId);
- return null;
- }
- }
- );
- }
-
- private void doOpen(String uId) throws Exception {
- this.hash = uId.hashCode();
-
- Job job = writeOperation.sink.newJob();
- FileOutputFormat.setOutputPath(job, new Path(path));
-
- // Each Writer is responsible for writing one bundle of elements and is represented by one
- // unique Hadoop task based on uId/hash. All tasks share the same job ID.
- JobID jobId = job.getJobID();
- TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
- context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
-
- FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
- recordWriter = outputFormat.getRecordWriter(context);
- outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
- }
-
- @Override
- public void write(T value) throws Exception {
- checkNotNull(recordWriter,
- "Record writer can't be null. Make sure to open Writer first!");
- KV<K, V> kv = writeOperation.sink.outputConverter().apply(value);
- recordWriter.write(kv.getKey(), kv.getValue());
- }
-
- @Override
- public void cleanup() throws Exception {
-
- }
-
- @Override
- public String close() throws Exception {
- return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
- new PrivilegedExceptionAction<String>() {
- @Override
- public String run() throws Exception {
- return doClose();
- }
- });
- }
-
- private String doClose() throws Exception {
- // task/attempt successful
- recordWriter.close(context);
- outputCommitter.commitTask(context);
-
- // result is prefix of the output file name
- return String.format("part-r-%d", hash);
- }
-
- @Override
- public WriteOperation<T, String> getWriteOperation() {
- return writeOperation;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
deleted file mode 100644
index 5cc2097..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.io.hadoop.WritableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a
- * Hadoop file-based input format.
- *
- * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
- * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
- * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to
- * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
- * key class and the value class.
- *
- * <p>A {@code HDFSFileSource} can be read from using the
- * {@link org.apache.beam.sdk.io.Read} transform. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
- * MyKey.class, MyValue.class);
- * PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
- * }
- * </pre>
- *
- * <p>Implementation note: Since Hadoop's
- * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
- * determines the input splits, this class extends {@link BoundedSource} rather than
- * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
- * dictates input splits.
- * @param <T> the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be read from the source via {@link FileInputFormat}.
- * @param <V> the type of values to be read from the source via {@link FileInputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
- private static final long serialVersionUID = 0L;
-
- private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class);
-
- public abstract String filepattern();
- public abstract Class<? extends FileInputFormat<K, V>> formatClass();
- public abstract Coder<T> coder();
- public abstract SerializableFunction<KV<K, V>, T> inputConverter();
- public abstract SerializableConfiguration serializableConfiguration();
- public @Nullable abstract SerializableSplit serializableSplit();
- public @Nullable abstract String username();
- public abstract boolean validateSource();
-
- // =======================================================================
- // Factory methods
- // =======================================================================
-
- public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, K, V>
- from(String filepattern,
- Class<W> formatClass,
- Coder<T> coder,
- SerializableFunction<KV<K, V>, T> inputConverter) {
- return HDFSFileSource.<T, K, V>builder()
- .setFilepattern(filepattern)
- .setFormatClass(formatClass)
- .setCoder(coder)
- .setInputConverter(inputConverter)
- .setConfiguration(null)
- .setUsername(null)
- .setValidateSource(true)
- .setSerializableSplit(null)
- .build();
- }
-
- public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, V>, K, V>
- from(String filepattern,
- Class<W> formatClass,
- Class<K> keyClass,
- Class<V> valueClass) {
- KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
- SerializableFunction<KV<K, V>, KV<K, V>> inputConverter =
- new SerializableFunction<KV<K, V>, KV<K, V>>() {
- @Override
- public KV<K, V> apply(KV<K, V> input) {
- return input;
- }
- };
- return HDFSFileSource.<KV<K, V>, K, V>builder()
- .setFilepattern(filepattern)
- .setFormatClass(formatClass)
- .setCoder(coder)
- .setInputConverter(inputConverter)
- .setConfiguration(null)
- .setUsername(null)
- .setValidateSource(true)
- .setSerializableSplit(null)
- .build();
- }
-
- public static HDFSFileSource<String, LongWritable, Text>
- fromText(String filepattern) {
- SerializableFunction<KV<LongWritable, Text>, String> inputConverter =
- new SerializableFunction<KV<LongWritable, Text>, String>() {
- @Override
- public String apply(KV<LongWritable, Text> input) {
- return input.getValue().toString();
- }
- };
- return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter);
- }
-
- /**
- * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration
- * object is altered to enable Avro input.
- */
- public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
- fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) {
- Class<AvroKeyInputFormat<T>> formatClass = castClass(AvroKeyInputFormat.class);
- SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter =
- new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() {
- @Override
- public T apply(KV<AvroKey<T>, NullWritable> input) {
- try {
- return CoderUtils.clone(coder, input.getKey().datum());
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- }
- };
- conf.set("avro.schema.input.key", coder.getSchema().toString());
- return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf);
- }
-
- /**
- * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration
- * object is altered to enable Avro input.
- */
- public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, NullWritable>
- fromAvro(String filepattern, Schema schema, Configuration conf) {
- return fromAvro(filepattern, AvroCoder.of(schema), conf);
- }
-
- /**
- * Helper to read from Avro source given {@link Class}. Keep in mind that configuration
- * object is altered to enable Avro input.
- */
- public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
- fromAvro(String filepattern, Class<T> cls, Configuration conf) {
- return fromAvro(filepattern, AvroCoder.of(cls), conf);
- }
-
- // =======================================================================
- // Builder methods
- // =======================================================================
-
- public abstract HDFSFileSource.Builder<T, K, V> toBuilder();
- public static <T, K, V> HDFSFileSource.Builder builder() {
- return new AutoValue_HDFSFileSource.Builder<>();
- }
-
- /**
- * AutoValue builder for {@link HDFSFileSource}.
- */
- @AutoValue.Builder
- public abstract static class Builder<T, K, V> {
- public abstract Builder<T, K, V> setFilepattern(String filepattern);
- public abstract Builder<T, K, V> setFormatClass(
- Class<? extends FileInputFormat<K, V>> formatClass);
- public abstract Builder<T, K, V> setCoder(Coder<T> coder);
- public abstract Builder<T, K, V> setInputConverter(
- SerializableFunction<KV<K, V>, T> inputConverter);
- public abstract Builder<T, K, V> setSerializableConfiguration(
- SerializableConfiguration serializableConfiguration);
- public Builder<T, K, V> setConfiguration(Configuration configuration) {
- if (configuration == null) {
- configuration = new Configuration(false);
- }
- return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
- }
- public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit serializableSplit);
- public abstract Builder<T, K, V> setUsername(@Nullable String username);
- public abstract Builder<T, K, V> setValidateSource(boolean validate);
- public abstract HDFSFileSource<T, K, V> build();
- }
-
- public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration configuration) {
- return this.toBuilder().setConfiguration(configuration).build();
- }
-
- public HDFSFileSource<T, K, V> withUsername(@Nullable String username) {
- return this.toBuilder().setUsername(username).build();
- }
-
- // =======================================================================
- // BoundedSource
- // =======================================================================
-
- @Override
- public List<? extends BoundedSource<T>> split(
- final long desiredBundleSizeBytes,
- PipelineOptions options) throws Exception {
- if (serializableSplit() == null) {
- List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs(
- new PrivilegedExceptionAction<List<InputSplit>>() {
- @Override
- public List<InputSplit> run() throws Exception {
- return computeSplits(desiredBundleSizeBytes, serializableConfiguration());
- }
- });
- return Lists.transform(inputSplits,
- new Function<InputSplit, BoundedSource<T>>() {
- @Override
- public BoundedSource<T> apply(@Nullable InputSplit inputSplit) {
- SerializableSplit serializableSplit = new SerializableSplit(inputSplit);
- return HDFSFileSource.this.toBuilder()
- .setSerializableSplit(serializableSplit)
- .build();
- }
- });
- } else {
- return ImmutableList.of(this);
- }
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) {
- long size = 0;
-
- try {
- // If this source represents a split from split,
- // then return the size of the split, rather then the entire input
- if (serializableSplit() != null) {
- return serializableSplit().getSplit().getLength();
- }
-
- size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Long>() {
- @Override
- public Long run() throws Exception {
- long size = 0;
- Job job = SerializableConfiguration.newJob(serializableConfiguration());
- for (FileStatus st : listStatus(createFormat(job), job)) {
- size += st.getLen();
- }
- return size;
- }
- });
- } catch (IOException e) {
- LOG.warn(
- "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
- // ignore, and return 0
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn(
- "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
- // ignore, and return 0
- }
- return size;
- }
-
- @Override
- public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
- this.validate();
- return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit());
- }
-
- @Override
- public void validate() {
- if (validateSource()) {
- try {
- UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- final Path pathPattern = new Path(filepattern());
- FileSystem fs = FileSystem.get(pathPattern.toUri(),
- SerializableConfiguration.newConfiguration(serializableConfiguration()));
- FileStatus[] fileStatuses = fs.globStatus(pathPattern);
- checkState(
- fileStatuses != null && fileStatuses.length > 0,
- "Unable to find any files matching %s", filepattern());
- return null;
- }
- });
- } catch (IOException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return coder();
- }
-
- // =======================================================================
- // Helpers
- // =======================================================================
-
- private List<InputSplit> computeSplits(long desiredBundleSizeBytes,
- SerializableConfiguration serializableConfiguration)
- throws IOException, IllegalAccessException, InstantiationException {
- Job job = SerializableConfiguration.newJob(serializableConfiguration);
- FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
- FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
- return createFormat(job).getSplits(job);
- }
-
- private FileInputFormat<K, V> createFormat(Job job)
- throws IOException, IllegalAccessException, InstantiationException {
- Path path = new Path(filepattern());
- FileInputFormat.addInputPath(job, path);
- return formatClass().newInstance();
- }
-
- private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job)
- throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
- // FileInputFormat#listStatus is protected, so call using reflection
- Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
- listStatus.setAccessible(true);
- @SuppressWarnings("unchecked")
- List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job);
- return stat;
- }
-
- @SuppressWarnings("unchecked")
- private static <T> Coder<T> getDefaultCoder(Class<T> c) {
- if (Writable.class.isAssignableFrom(c)) {
- Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
- return (Coder<T>) WritableCoder.of(writableClass);
- } else if (Void.class.equals(c)) {
- return (Coder<T>) VoidCoder.of();
- }
- // TODO: how to use registered coders here?
- throw new IllegalStateException("Cannot find coder for " + c);
- }
-
- @SuppressWarnings("unchecked")
- private static <T> Class<T> castClass(Class<?> aClass) {
- return (Class<T>) aClass;
- }
-
- // =======================================================================
- // BoundedReader
- // =======================================================================
-
- private static class HDFSFileReader<T, K, V> extends BoundedSource.BoundedReader<T> {
-
- private final HDFSFileSource<T, K, V> source;
- private final String filepattern;
- private final Class<? extends FileInputFormat<K, V>> formatClass;
- private final Job job;
-
- private List<InputSplit> splits;
- private ListIterator<InputSplit> splitsIterator;
-
- private Configuration conf;
- private FileInputFormat<?, ?> format;
- private TaskAttemptContext attemptContext;
- private RecordReader<K, V> currentReader;
- private KV<K, V> currentPair;
-
- HDFSFileReader(
- HDFSFileSource<T, K, V> source,
- String filepattern,
- Class<? extends FileInputFormat<K, V>> formatClass,
- SerializableSplit serializableSplit)
- throws IOException {
- this.source = source;
- this.filepattern = filepattern;
- this.formatClass = formatClass;
- this.job = SerializableConfiguration.newJob(source.serializableConfiguration());
-
- if (serializableSplit != null) {
- this.splits = ImmutableList.of(serializableSplit.getSplit());
- this.splitsIterator = splits.listIterator();
- }
- }
-
- @Override
- public boolean start() throws IOException {
- Path path = new Path(filepattern);
- FileInputFormat.addInputPath(job, path);
-
- conf = job.getConfiguration();
- try {
- format = formatClass.newInstance();
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IOException("Cannot instantiate file input format " + formatClass, e);
- }
- attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID());
-
- if (splitsIterator == null) {
- splits = format.getSplits(job);
- splitsIterator = splits.listIterator();
- }
-
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- try {
- if (currentReader != null && currentReader.nextKeyValue()) {
- currentPair = nextPair();
- return true;
- } else {
- while (splitsIterator.hasNext()) {
- // advance the reader and see if it has records
- final InputSplit nextSplit = splitsIterator.next();
- @SuppressWarnings("unchecked")
- RecordReader<K, V> reader =
- (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
- if (currentReader != null) {
- currentReader.close();
- }
- currentReader = reader;
- UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- currentReader.initialize(nextSplit, attemptContext);
- return null;
- }
- });
- if (currentReader.nextKeyValue()) {
- currentPair = nextPair();
- return true;
- }
- currentReader.close();
- currentReader = null;
- }
- // either no next split or all readers were empty
- currentPair = null;
- return false;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (currentPair == null) {
- throw new NoSuchElementException();
- }
- return source.inputConverter().apply(currentPair);
- }
-
- @Override
- public void close() throws IOException {
- if (currentReader != null) {
- currentReader.close();
- currentReader = null;
- }
- currentPair = null;
- }
-
- @Override
- public BoundedSource<T> getCurrentSource() {
- return source;
- }
-
- @SuppressWarnings("unchecked")
- private KV<K, V> nextPair() throws IOException, InterruptedException {
- K key = currentReader.getCurrentKey();
- V value = currentReader.getCurrentValue();
- // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
- if (key instanceof Writable) {
- key = (K) WritableUtils.clone((Writable) key, conf);
- }
- if (value instanceof Writable) {
- value = (V) WritableUtils.clone((Writable) value, conf);
- }
- return KV.of(key, value);
- }
-
- // =======================================================================
- // Optional overrides
- // =======================================================================
-
- @Override
- public Double getFractionConsumed() {
- if (currentReader == null) {
- return 0.0;
- }
- if (splits.isEmpty()) {
- return 1.0;
- }
- int index = splitsIterator.previousIndex();
- int numReaders = splits.size();
- if (index == numReaders) {
- return 1.0;
- }
- double before = 1.0 * index / numReaders;
- double after = 1.0 * (index + 1) / numReaders;
- Double fractionOfCurrentReader = getProgress();
- if (fractionOfCurrentReader == null) {
- return before;
- }
- return before + fractionOfCurrentReader * (after - before);
- }
-
- private Double getProgress() {
- try {
- return (double) currentReader.getProgress();
- } catch (IOException | InterruptedException e) {
- return null;
- }
- }
-
- }
-
- // =======================================================================
- // SerializableSplit
- // =======================================================================
-
- /**
- * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
- * serialized using Java's standard serialization mechanisms. Note that the InputSplit
- * has to be Writable (which most are).
- */
- protected static class SerializableSplit implements Externalizable {
- private static final long serialVersionUID = 0L;
-
- private InputSplit split;
-
- public SerializableSplit() {
- }
-
- public SerializableSplit(InputSplit split) {
- checkArgument(split instanceof Writable, "Split is not writable: %s", split);
- this.split = split;
- }
-
- public InputSplit getSplit() {
- return split;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(split.getClass().getCanonicalName());
- ((Writable) split).write(out);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- String className = in.readUTF();
- try {
- split = (InputSplit) Class.forName(className).newInstance();
- ((Writable) split).readFields(in);
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IOException(e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
deleted file mode 100644
index fe2db5f..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-
-/**
- * This class is deprecated, and only exists for HDFSFileSink.
- */
-@Deprecated
-public abstract class Sink<T> implements Serializable, HasDisplayData {
- /**
- * Ensures that the sink is valid and can be written to before the write operation begins. One
- * should use {@link com.google.common.base.Preconditions} to implement this method.
- */
- public abstract void validate(PipelineOptions options);
-
- /**
- * Returns an instance of a {@link WriteOperation} that can write to this Sink.
- */
- public abstract WriteOperation<T, ?> createWriteOperation();
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {}
-
- /**
- * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
- *
- * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
- * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
- * a bundle to the sink.
- *
- * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance,
- * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
- *
- * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
- * call to {@code initialize} method and deserialized before calls to
- * {@code createWriter} and {@code finalized}. However, it is not
- * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
- * state of the {@code WriteOperation}.
- *
- * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
- *
- * @param <T> The type of objects to write
- * @param <WriteT> The result of a per-bundle write
- */
- public abstract static class WriteOperation<T, WriteT> implements Serializable {
- /**
- * Performs initialization before writing to the sink. Called before writing begins.
- */
- public abstract void initialize(PipelineOptions options) throws Exception;
-
- /**
- * Indicates that the operation will be performing windowed writes.
- */
- public abstract void setWindowedWrites(boolean windowedWrites);
-
- /**
- * Given an Iterable of results from bundle writes, performs finalization after writing and
- * closes the sink. Called after all bundle writes are complete.
- *
- * <p>The results that are passed to finalize are those returned by bundles that completed
- * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
- * one writer result will be passed to finalize for each bundle. An implementation of finalize
- * should perform clean up of any failed and successfully retried bundles. Note that these
- * failed bundles will not have their writer result passed to finalize, so finalize should be
- * capable of locating any temporary/partial output written by failed bundles.
- *
- * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
- * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
- * failure/retry or for redundancy.
- *
- * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
- * finalize is called multiple times.
- *
- * @param writerResults an Iterable of results from successful bundle writes.
- */
- public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
- throws Exception;
-
- /**
- * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
- *
- * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
- * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
- *
- * <p>Must not mutate the state of the WriteOperation.
- */
- public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
-
- /**
- * Returns the Sink that this write operation writes to.
- */
- public abstract Sink<T> getSink();
-
- /**
- * Returns a coder for the writer result type.
- */
- public abstract Coder<WriteT> getWriterResultCoder();
- }
-
- /**
- * A Writer writes a bundle of elements from a PCollection to a sink.
- * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
- * and {@link Writer#close} is called after all elements in the bundle have been written.
- * {@link Writer#write} writes an element to the sink.
- *
- * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
- * multiple instances of a Writer may be instantiated in different threads on the same worker.
- *
- * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
- *
- * @param <T> The type of object to write
- * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
- */
- public abstract static class Writer<T, WriteT> {
- /**
- * Performs bundle initialization. For example, creates a temporary file for writing or
- * initializes any state that will be used across calls to {@link Writer#write}.
- *
- * <p>The unique id that is given to open should be used to ensure that the writer's output does
- * not interfere with the output of other Writers, as a bundle may be executed many times for
- * fault tolerance. See {@link Sink} for more information about bundle ids.
- *
- * <p>The window and paneInfo arguments are populated when windowed writes are requested.
- * shard and numbShards are populated for the case of static sharding. In cases where the
- * runner is dynamically picking sharding, shard and numShards might both be set to -1.
- */
- public abstract void openWindowed(String uId,
- BoundedWindow window,
- PaneInfo paneInfo,
- int shard,
- int numShards) throws Exception;
-
- /**
- * Perform bundle initialization for the case where the file is written unwindowed.
- */
- public abstract void openUnwindowed(String uId,
- int shard,
- int numShards) throws Exception;
-
- public abstract void cleanup() throws Exception;
-
- /**
- * Called for each value in the bundle.
- */
- public abstract void write(T value) throws Exception;
-
- /**
- * Finishes writing the bundle. Closes any resources used for writing the bundle.
- *
- * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
- * finalization. The result should contain some way to identify the output of this bundle (using
- * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
- * successful writes. See {@link Sink} for more information about bundle ids.
- *
- * @return the writer result
- */
- public abstract WriteT close() throws Exception;
-
- /**
- * Returns the write operation this writer belongs to.
- */
- public abstract WriteOperation<T, WriteT> getWriteOperation();
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
deleted file mode 100644
index fd05a19..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * {@link UserGroupInformation} helper methods.
- */
-public class UGIHelper {
-
- /**
- * Find the most appropriate UserGroupInformation to use.
- * @param username the user name, or NULL if none is specified.
- * @return the most appropriate UserGroupInformation
- */
- public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException {
- return UserGroupInformation.getBestUGI(null, username);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
deleted file mode 100644
index ef6556e..0000000
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation;
-import org.apache.beam.sdk.io.hdfs.Sink.Writer;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is deprecated, and only exists currently for HDFSFileSink.
- */
-@Deprecated
-public class Write<T> extends PTransform<PCollection<T>, PDone> {
- private static final Logger LOG = LoggerFactory.getLogger(Write.class);
-
- private static final int UNKNOWN_SHARDNUM = -1;
- private static final int UNKNOWN_NUMSHARDS = -1;
-
- private final Sink<T> sink;
- // This allows the number of shards to be dynamically computed based on the input
- // PCollection.
- @Nullable
- private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
- // We don't use a side input for static sharding, as we want this value to be updatable
- // when a pipeline is updated.
- @Nullable
- private final ValueProvider<Integer> numShardsProvider;
- private boolean windowedWrites;
-
- /**
- * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
- * control how many different shards are produced.
- */
- public static <T> Write<T> to(Sink<T> sink) {
- checkNotNull(sink, "sink");
- return new Write<>(sink, null /* runner-determined sharding */, null, false);
- }
-
- private Write(
- Sink<T> sink,
- @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
- @Nullable ValueProvider<Integer> numShardsProvider,
- boolean windowedWrites) {
- this.sink = sink;
- this.computeNumShards = computeNumShards;
- this.numShardsProvider = numShardsProvider;
- this.windowedWrites = windowedWrites;
- }
-
- @Override
- public PDone expand(PCollection<T> input) {
- checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
- "%s can only be applied to an unbounded PCollection if doing windowed writes",
- Write.class.getSimpleName());
- return createWrite(input, sink.createWriteOperation());
- }
-
- @Override
- public void validate(PipelineOptions options) {
- sink.validate(options);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
- .include("sink", sink);
- if (getSharding() != null) {
- builder.include("sharding", getSharding());
- } else if (getNumShards() != null) {
- String numShards = getNumShards().isAccessible()
- ? getNumShards().get().toString() : getNumShards().toString();
- builder.add(DisplayData.item("numShards", numShards)
- .withLabel("Fixed Number of Shards"));
- }
- }
-
- /**
- * Returns the {@link Sink} associated with this PTransform.
- */
- public Sink<T> getSink() {
- return sink;
- }
-
- /**
- * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
- * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
- * {@link #withSharding(PTransform)}), or runner-determined (by {@link
- * #withRunnerDeterminedSharding()}.
- */
- @Nullable
- public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
- return computeNumShards;
- }
-
- public ValueProvider<Integer> getNumShards() {
- return numShardsProvider;
- }
-
- /**
- * Returns a new {@link Write} that will write to the current {@link Sink} using the
- * specified number of shards.
- *
- * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
- * more information.
- *
- * <p>A value less than or equal to 0 will be equivalent to the default behavior of
- * runner-determined sharding.
- */
- public Write<T> withNumShards(int numShards) {
- if (numShards > 0) {
- return withNumShards(StaticValueProvider.of(numShards));
- }
- return withRunnerDeterminedSharding();
- }
-
- /**
- * Returns a new {@link Write} that will write to the current {@link Sink} using the
- * {@link ValueProvider} specified number of shards.
- *
- * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
- * more information.
- */
- public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
- return new Write<>(sink, null, numShardsProvider, windowedWrites);
- }
-
- /**
- * Returns a new {@link Write} that will write to the current {@link Sink} using the
- * specified {@link PTransform} to compute the number of shards.
- *
- * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
- * more information.
- */
- public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
- checkNotNull(
- sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
- return new Write<>(sink, sharding, null, windowedWrites);
- }
-
- /**
- * Returns a new {@link Write} that will write to the current {@link Sink} with
- * runner-determined sharding.
- */
- public Write<T> withRunnerDeterminedSharding() {
- return new Write<>(sink, null, null, windowedWrites);
- }
-
- /**
- * Returns a new {@link Write} that writes preserves windowing on it's input.
- *
- * <p>If this option is not specified, windowing and triggering are replaced by
- * {@link GlobalWindows} and {@link DefaultTrigger}.
- *
- * <p>If there is no data for a window, no output shards will be generated for that window.
- * If a window triggers multiple times, then more than a single output shard might be
- * generated multiple times; it's up to the sink implementation to keep these output shards
- * unique.
- *
- * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
- * positive value.
- */
- public Write<T> withWindowedWrites() {
- return new Write<>(sink, computeNumShards, numShardsProvider, true);
- }
-
- /**
- * Writes all the elements in a bundle using a {@link Writer} produced by the
- * {@link WriteOperation} associated with the {@link Sink}.
- */
- private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
- // Writer that will write the records in this bundle. Lazily
- // initialized in processElement.
- private Writer<T, WriteT> writer = null;
- private BoundedWindow window;
- private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
- WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
- this.writeOperationView = writeOperationView;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- // Lazily initialize the Writer
- if (writer == null) {
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- LOG.info("Opening writer for write operation {}", writeOperation);
- writer = writeOperation.createWriter(c.getPipelineOptions());
-
- if (windowedWrites) {
- writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
- UNKNOWN_NUMSHARDS);
- } else {
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
- }
- this.window = window;
- LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
- }
- try {
- writer.write(c.element());
- } catch (Exception e) {
- // Discard write result and close the write.
- try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
- } catch (Exception closeException) {
- if (closeException instanceof InterruptedException) {
- // Do not silently ignore interrupted state.
- Thread.currentThread().interrupt();
- }
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
- }
- throw e;
- }
- }
-
- @FinishBundle
- public void finishBundle(FinishBundleContext c) throws Exception {
- if (writer != null) {
- WriteT result = writer.close();
- c.output(result, window.maxTimestamp(), window);
- // Reset state in case of reuse.
- writer = null;
- window = null;
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(Write.this);
- }
- }
-
- /**
- * Like {@link WriteBundles}, but where the elements for each shard have been collected into
- * a single iterable.
- *
- * @see WriteBundles
- */
- private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
- private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
- private final PCollectionView<Integer> numShardsView;
-
- WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView,
- PCollectionView<Integer> numShardsView) {
- this.writeOperationView = writeOperationView;
- this.numShardsView = numShardsView;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
- // In a sharded write, single input element represents one shard. We can open and close
- // the writer in each call to processElement.
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- LOG.info("Opening writer for write operation {}", writeOperation);
- Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
- if (windowedWrites) {
- writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
- numShards);
- } else {
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
- }
- LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
-
- try {
- try {
- for (T t : c.element().getValue()) {
- writer.write(t);
- }
- } catch (Exception e) {
- try {
- writer.close();
- } catch (Exception closeException) {
- if (closeException instanceof InterruptedException) {
- // Do not silently ignore interrupted state.
- Thread.currentThread().interrupt();
- }
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
- }
- throw e;
- }
-
- // Close the writer; if this throws let the error propagate.
- WriteT result = writer.close();
- c.output(result);
- } catch (Exception e) {
- // If anything goes wrong, make sure to delete the temporary file.
- writer.cleanup();
- throw e;
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(Write.this);
- }
- }
-
- private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
- private final PCollectionView<Integer> numShardsView;
- private final ValueProvider<Integer> numShardsProvider;
- private int shardNumber;
-
- ApplyShardingKey(PCollectionView<Integer> numShardsView,
- ValueProvider<Integer> numShardsProvider) {
- this.numShardsView = numShardsView;
- this.numShardsProvider = numShardsProvider;
- shardNumber = UNKNOWN_SHARDNUM;
- }
-
- @ProcessElement
- public void processElement(ProcessContext context) {
- int shardCount = 0;
- if (numShardsView != null) {
- shardCount = context.sideInput(numShardsView);
- } else {
- checkNotNull(numShardsProvider);
- shardCount = numShardsProvider.get();
- }
- checkArgument(
- shardCount > 0,
- "Must have a positive number of shards specified for non-runner-determined sharding."
- + " Got %s",
- shardCount);
- if (shardNumber == UNKNOWN_SHARDNUM) {
- // We want to desynchronize the first record sharding key for each instance of
- // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
- shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
- } else {
- shardNumber = (shardNumber + 1) % shardCount;
- }
- context.output(KV.of(shardNumber, context.element()));
- }
- }
-
- /**
- * A write is performed as sequence of three {@link ParDo}'s.
- *
- * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
- * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
- * called. The output of this ParDo is a singleton PCollection
- * containing the WriteOperation.
- *
- * <p>This singleton collection containing the WriteOperation is then used as a side input to a
- * ParDo over the PCollection of elements to write. In this bundle-writing phase,
- * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
- * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
- * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
- * every element in the bundle. The output of this ParDo is a PCollection of
- * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
- * each bundle.
- *
- * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
- * the collection of writer results as a side-input. In this ParDo,
- * {@link WriteOperation#finalize} is called to finalize the write.
- *
- * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
- * before the exception that caused the write to fail is propagated and the write result will be
- * discarded.
- *
- * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
- * deserialized in the bundle-writing and finalization phases, any state change to the
- * WriteOperation object that occurs during initialization is visible in the latter phases.
- * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
- * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
- * WriteOperation).
- */
- private <WriteT> PDone createWrite(
- PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
- Pipeline p = input.getPipeline();
- writeOperation.setWindowedWrites(windowedWrites);
-
- // A coder to use for the WriteOperation.
- @SuppressWarnings("unchecked")
- Coder<WriteOperation<T, WriteT>> operationCoder =
- (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
- // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
- // the sink.
- PCollection<WriteOperation<T, WriteT>> operationCollection =
- p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder));
-
- // Initialize the resource in a do-once ParDo on the WriteOperation.
- operationCollection = operationCollection
- .apply("Initialize", ParDo.of(
- new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.element();
- LOG.info("Initializing write operation {}", writeOperation);
- writeOperation.initialize(c.getPipelineOptions());
- writeOperation.setWindowedWrites(windowedWrites);
- LOG.debug("Done initializing write operation {}", writeOperation);
- // The WriteOperation is also the output of this ParDo, so it can have mutable
- // state.
- c.output(writeOperation);
- }
- }))
- .setCoder(operationCoder);
-
- // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
- final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
- operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
- if (!windowedWrites) {
- // Re-window the data into the global window and remove any existing triggers.
- input =
- input.apply(
- Window.<T>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes());
- }
-
-
- // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
- // as a side input) and collect the results of the writes in a PCollection.
- // There is a dependency between this ParDo and the first (the WriteOperation PCollection
- // as a side input), so this will happen after the initial ParDo.
- PCollection<WriteT> results;
- final PCollectionView<Integer> numShardsView;
- if (computeNumShards == null && numShardsProvider == null) {
- if (windowedWrites) {
- throw new IllegalStateException("When doing windowed writes, numShards must be set"
- + "explicitly to a positive value");
- }
- numShardsView = null;
- results = input
- .apply("WriteBundles",
- ParDo.of(new WriteBundles<>(writeOperationView))
- .withSideInputs(writeOperationView));
- } else {
- if (computeNumShards != null) {
- numShardsView = input.apply(computeNumShards);
- results = input
- .apply("ApplyShardLabel", ParDo.of(
- new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
- .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
- .apply("WriteShardedBundles",
- ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView))
- .withSideInputs(numShardsView, writeOperationView));
- } else {
- numShardsView = null;
- results = input
- .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
- .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
- .apply("WriteShardedBundles",
- ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
- .withSideInputs(writeOperationView));
- }
- }
- results.setCoder(writeOperation.getWriterResultCoder());
-
- if (windowedWrites) {
- // When processing streaming windowed writes, results will arrive multiple times. This
- // means we can't share the below implementation that turns the results into a side input,
- // as new data arriving into a side input does not trigger the listening DoFn. Instead
- // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
- // whenever new data arrives.
- PCollection<KV<Void, WriteT>> keyedResults =
- results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null));
- keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation
- .getWriterResultCoder()));
-
- // Is the continuation trigger sufficient?
- keyedResults
- .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
- .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<WriteT> results = Lists.newArrayList(c.element().getValue());
- writeOperation.finalize(results, c.getPipelineOptions());
- LOG.debug("Done finalizing write operation {}", writeOperation);
- }
- }).withSideInputs(writeOperationView));
- } else {
- final PCollectionView<Iterable<WriteT>> resultsView =
- results.apply(View.<WriteT>asIterable());
- ImmutableList.Builder<PCollectionView<?>> sideInputs =
- ImmutableList.<PCollectionView<?>>builder().add(resultsView);
- if (numShardsView != null) {
- sideInputs.add(numShardsView);
- }
-
- // Finalize the write in another do-once ParDo on the singleton collection containing the
- // Writer. The results from the per-bundle writes are given as an Iterable side input.
- // The WriteOperation's state is the same as after its initialization in the first do-once
- // ParDo. There is a dependency between this ParDo and the parallel write (the writer
- // results collection as a side input), so it will happen after the parallel write.
- // For the non-windowed case, we guarantee that if no data is written but the user has
- // set numShards, then all shards will be written out as empty files. For this reason we
- // use a side input here.
- operationCollection
- .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.element();
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
- LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
- // We must always output at least 1 shard, and honor user-specified numShards if
- // set.
- int minShardsNeeded;
- if (numShardsView != null) {
- minShardsNeeded = c.sideInput(numShardsView);
- } else if (numShardsProvider != null) {
- minShardsNeeded = numShardsProvider.get();
- } else {
- minShardsNeeded = 1;
- }
- int extraShardsNeeded = minShardsNeeded - results.size();
- if (extraShardsNeeded > 0) {
- LOG.info(
- "Creating {} empty output shards in addition to {} written for a total of "
- + " {}.", extraShardsNeeded, results.size(), minShardsNeeded);
- for (int i = 0; i < extraShardsNeeded; ++i) {
- Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
- UNKNOWN_NUMSHARDS);
- WriteT emptyWrite = writer.close();
- results.add(emptyWrite);
- }
- LOG.debug("Done creating extra shards.");
- }
- writeOperation.finalize(results, c.getPipelineOptions());
- LOG.debug("Done finalizing write operation {}", writeOperation);
- }
- }).withSideInputs(sideInputs.build()));
- }
- return PDone.in(input.getPipeline());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
index 763b30a..32c36cc 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
@@ -17,6 +17,7 @@
*/
/**
- * Transforms used to read from the Hadoop file system (HDFS).
+ * {@link org.apache.beam.sdk.io.FileSystem} implementation for any Hadoop
+ * {@link org.apache.hadoop.fs.FileSystem}.
*/
package org.apache.beam.sdk.io.hdfs;