You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/14 17:15:45 UTC
[1/2] incubator-beam git commit: [BEAM-55] TextIO & FileBasedSink:
Add support for compressed output
Repository: incubator-beam
Updated Branches:
refs/heads/master 49f944430 -> b7b68e6fb
[BEAM-55] TextIO & FileBasedSink: Add support for compressed output
[BEAM-55] Add customizable file-based output support through DecoratedFileSink
and concrete Gzip file-based output support through
WriterOutputGzipDecoratorFactory
[BEAM-55] Add example pipeline usage and a few eclipse-related entries to
gitignore
[BEAM-55] Move logic from DecoratedFileSink into FileBasedSink and TextIO to
allow for direct support of customizable file-based, including built-in Gzip
and Bzip2 support, per request by @dhalperi
[BEAM-697] TextIO.Write.Bound.withHeader() and withFooter() now pass
current value of validate to returned Bound instance instead of literal 'false'
[BEAM-55] Simplified overlycomplex API, removed example pipeline, cleaned up
comment formatting, added small usage example to main TextIO javadoc,
per request by @dhalperi
[BEAM-55] Revert erroneous formatting, remove old comments, fix javadoc, inline
superclass only used in one unit test
[BEAM-55] Add writableByteChannelFactory to DisplayData, move fix for BEAM-697 into its own PR
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa589ee4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa589ee4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa589ee4
Branch: refs/heads/master
Commit: fa589ee4e16b91a06e2b78d870b1fa70ba5834e8
Parents: 49f9444
Author: Jeffrey Scott Keone Payne <je...@gmail.com>
Authored: Thu Sep 29 12:23:44 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 14 10:14:48 2016 -0700
----------------------------------------------------------------------
.gitignore | 5 +
.../org/apache/beam/sdk/io/FileBasedSink.java | 138 ++++++++++++++++++-
.../java/org/apache/beam/sdk/io/TextIO.java | 91 +++++++++---
.../sdk/io/DrunkWritableByteChannelFactory.java | 80 +++++++++++
.../apache/beam/sdk/io/FileBasedSinkTest.java | 108 +++++++++++++++
.../java/org/apache/beam/sdk/io/TextIOTest.java | 59 ++++++--
6 files changed, 447 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 5390dd0..d6cffec 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,8 @@ bin/
.project
.factorypath
.checkstyle
+.fbExcludeFilterFile
+.apt_generated/
.settings/
# The build process generates the dependency-reduced POM, but it shouldn't be
@@ -27,6 +29,9 @@ dependency-reduced-pom.xml
# produced by a text editor.
*~
+# Ignore MacOSX files.
+.DS_Store
+
# NOTE: if you modify this file, you probably need to modify the file set that
# is an input to 'maven-assembly-plugin' that generates source distribution.
# This is typically in files named 'src.xml' throughout this repository.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index f571d50..7e95c5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -20,11 +20,14 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
import com.google.common.collect.Ordering;
+
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
@@ -34,6 +37,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.zip.GZIPOutputStream;
+
+import javax.annotation.Nullable;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -45,6 +52,7 @@ import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +77,64 @@ import org.slf4j.LoggerFactory;
*/
public abstract class FileBasedSink<T> extends Sink<T> {
/**
+ * Directly supported file output compression types.
+ */
+ public static enum CompressionType implements WritableByteChannelFactory {
+ /**
+ * No compression, or any other transformation, will be used.
+ */
+ UNCOMPRESSED("", MimeTypes.TEXT) {
+ @Override
+ public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+ return channel;
+ }
+ },
+ /**
+ * Provides GZip output transformation.
+ */
+ GZIP(".gz", MimeTypes.BINARY) {
+ @Override
+ public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+ return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true));
+ }
+ },
+ /**
+ * Provides BZip2 output transformation.
+ */
+ BZIP2(".bz2", MimeTypes.BINARY) {
+ @Override
+ public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+ return Channels
+ .newChannel(new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
+ }
+ };
+
+ private String filenameSuffix;
+ private String mimeType;
+
+ private CompressionType(String suffix, String mimeType) {
+ this.filenameSuffix = suffix;
+ this.mimeType = mimeType;
+ }
+
+ @Override
+ public String getFilenameSuffix() {
+ return filenameSuffix;
+ }
+
+ @Override
+ public String getMimeType() {
+ return mimeType;
+ }
+ }
+
+ /**
+ * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
+ * underlying channel. The default is to not compress the output using {@link #UNCOMPRESSED}.
+ */
+ protected final WritableByteChannelFactory writableByteChannelFactory;
+
+ /**
* Base filename for final output files.
*/
protected final String baseOutputFilename;
@@ -85,21 +151,48 @@ public abstract class FileBasedSink<T> extends Sink<T> {
protected final String fileNamingTemplate;
/**
- * Construct a FileBasedSink with the given base output filename and extension.
+ * Construct a FileBasedSink with the given base output filename and extension. A
+ * {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED} will be used.
*/
public FileBasedSink(String baseOutputFilename, String extension) {
this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX);
}
/**
+ * Construct a FileBasedSink with the given base output filename, extension, and
+ * {@link WritableByteChannelFactory}.
+ */
+ public FileBasedSink(String baseOutputFilename, String extension,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
+ }
+
+ /**
* Construct a FileBasedSink with the given base output filename, extension, and file naming
- * template.
+ * template. A {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED}
+ * will be used.
*
* <p>See {@link ShardNameTemplate} for a description of file naming templates.
*/
public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
+ this(baseOutputFilename, extension, fileNamingTemplate, CompressionType.UNCOMPRESSED);
+ }
+
+ /**
+ * Construct a FileBasedSink with the given base output filename, extension, file naming template,
+ * and {@link WritableByteChannelFactory}.
+ *
+ * <p>See {@link ShardNameTemplate} for a description of file naming templates.
+ */
+ public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ this.writableByteChannelFactory = writableByteChannelFactory;
this.baseOutputFilename = baseOutputFilename;
- this.extension = extension;
+ if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
+ this.extension = extension + getFileExtension(writableByteChannelFactory.getFilenameSuffix());
+ } else {
+ this.extension = extension;
+ }
this.fileNamingTemplate = fileNamingTemplate;
}
@@ -492,7 +585,10 @@ public abstract class FileBasedSink<T> extends Sink<T> {
filename = FileBasedWriteOperation.buildTemporaryFilename(
getWriteOperation().baseTemporaryFilename, uId);
LOG.debug("Opening {}.", filename);
- channel = IOChannelUtils.create(filename, mimeType);
+ final WritableByteChannelFactory factory =
+ getWriteOperation().getSink().writableByteChannelFactory;
+ mimeType = factory.getMimeType();
+ channel = factory.create(IOChannelUtils.create(filename, mimeType));
try {
prepareWrite(channel);
LOG.debug("Writing header to {}.", filename);
@@ -514,7 +610,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
/**
- * Closes the channel and return the bundle result.
+ * Closes the channel and returns the bundle result.
*/
@Override
public final FileResult close() throws Exception {
@@ -674,4 +770,36 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
}
}
+
+ /**
+ * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
+ * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
+ * would normally be written directly to the {@link WritableByteChannel} passed into
+ * {@link WritableByteChannelFactory#create(WritableByteChannel)}.
+ *
+ * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
+ * building {@link DisplayData}.
+ */
+ public interface WritableByteChannelFactory extends Serializable {
+ /**
+ * @param channel the {@link WritableByteChannel} to wrap
+ * @return the {@link WritableByteChannel} to be used during output
+ * @throws IOException
+ */
+ public WritableByteChannel create(WritableByteChannel channel) throws IOException;
+
+ /**
+ * @return the MIME type that should be used for the files that will hold the output data
+ * @see MimeTypes
+ * @see <a href=
+ * 'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a>
+ */
+ public String getMimeType();
+
+ /**
+ * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP}
+ */
+ @Nullable
+ public String getFilenameSuffix();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 9d91dff..3ae2a0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -17,12 +17,13 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -33,11 +34,14 @@ import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;
import java.util.regex.Pattern;
+
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
@@ -102,6 +106,14 @@ import org.apache.beam.sdk.values.PDone;
* .to("gs://my_bucket/path/to/numbers")
* .withSuffix(".txt")
* .withCoder(TextualIntegerCoder.of()));
+ *
+ * // Same as above, only with Gzip compression:
+ * PCollection<Integer> numbers = ...;
+ * numbers.apply("WriteNumbers", TextIO.Write
+ * .to("gs://my_bucket/path/to/numbers")
+ * .withSuffix(".txt")
+ * .withCoder(TextualIntegerCoder.of())
+ * .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
* }</pre>
*/
public class TextIO {
@@ -458,6 +470,21 @@ public class TextIO {
return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer);
}
+ /**
+ * Returns a transform for writing to text files like this one but that has the given
+ * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
+ * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+ *
+ * <p>A {@code null} value will reset the value to the default value mentioned above.
+ *
+ * @param writableByteChannelFactory the factory to be used during output
+ */
+ public static Bound<String> withWritableByteChannelFactory(
+ WritableByteChannelFactory writableByteChannelFactory) {
+ return new Bound<>(DEFAULT_TEXT_CODER)
+ .withWritableByteChannelFactory(writableByteChannelFactory);
+ }
+
// TODO: appendingNewlines, etc.
/**
@@ -493,13 +520,21 @@ public class TextIO {
/** An option to indicate if output validation is desired. Default is true. */
private final boolean validate;
+ /**
+ * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
+ * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+ */
+ private final WritableByteChannelFactory writableByteChannelFactory;
+
Bound(Coder<T> coder) {
- this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true);
+ this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true,
+ FileBasedSink.CompressionType.UNCOMPRESSED);
}
private Bound(String name, String filenamePrefix, String filenameSuffix,
@Nullable String header, @Nullable String footer, Coder<T> coder, int numShards,
- String shardTemplate, boolean validate) {
+ String shardTemplate, boolean validate,
+ WritableByteChannelFactory writableByteChannelFactory) {
super(name);
this.header = header;
this.footer = footer;
@@ -509,6 +544,8 @@ public class TextIO {
this.numShards = numShards;
this.shardTemplate = shardTemplate;
this.validate = validate;
+ this.writableByteChannelFactory =
+ firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
}
/**
@@ -522,7 +559,7 @@ public class TextIO {
public Bound<T> to(String filenamePrefix) {
validateOutputComponent(filenamePrefix);
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
- shardTemplate, validate);
+ shardTemplate, validate, writableByteChannelFactory);
}
/**
@@ -536,7 +573,7 @@ public class TextIO {
public Bound<T> withSuffix(String nameExtension) {
validateOutputComponent(nameExtension);
return new Bound<>(name, filenamePrefix, nameExtension, header, footer, coder, numShards,
- shardTemplate, validate);
+ shardTemplate, validate, writableByteChannelFactory);
}
/**
@@ -556,7 +593,7 @@ public class TextIO {
public Bound<T> withNumShards(int numShards) {
checkArgument(numShards >= 0);
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
- shardTemplate, validate);
+ shardTemplate, validate, writableByteChannelFactory);
}
/**
@@ -569,7 +606,7 @@ public class TextIO {
*/
public Bound<T> withShardNameTemplate(String shardTemplate) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
- shardTemplate, validate);
+ shardTemplate, validate, writableByteChannelFactory);
}
/**
@@ -587,7 +624,7 @@ public class TextIO {
*/
public Bound<T> withoutSharding() {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, 1, "",
- validate);
+ validate, writableByteChannelFactory);
}
/**
@@ -600,7 +637,7 @@ public class TextIO {
*/
public <X> Bound<X> withCoder(Coder<X> coder) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
- shardTemplate, validate);
+ shardTemplate, validate, writableByteChannelFactory);
}
/**
@@ -615,7 +652,7 @@ public class TextIO {
*/
public Bound<T> withoutValidation() {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
- shardTemplate, false);
+ shardTemplate, false, writableByteChannelFactory);
}
/**
@@ -630,7 +667,7 @@ public class TextIO {
*/
public Bound<T> withHeader(@Nullable String header) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
- shardTemplate, false);
+ shardTemplate, false, writableByteChannelFactory);
}
/**
@@ -645,7 +682,24 @@ public class TextIO {
*/
public Bound<T> withFooter(@Nullable String footer) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
- shardTemplate, false);
+ shardTemplate, false, writableByteChannelFactory);
+ }
+
+ /**
+ * Returns a transform for writing to text files like this one but that has the given
+ * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
+ * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+ *
+ * <p>A {@code null} value will reset the value to the default value mentioned above.
+ *
+ * <p>Does not modify this object.
+ *
+ * @param writableByteChannelFactory the factory to be used during output
+ */
+ public Bound<T> withWritableByteChannelFactory(
+ WritableByteChannelFactory writableByteChannelFactory) {
+ return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
+ shardTemplate, validate, writableByteChannelFactory);
}
@Override
@@ -654,11 +708,10 @@ public class TextIO {
throw new IllegalStateException(
"need to set the filename prefix of a TextIO.Write transform");
}
-
org.apache.beam.sdk.io.Write.Bound<T> write =
org.apache.beam.sdk.io.Write.to(
new TextSink<>(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
- coder));
+ coder, writableByteChannelFactory));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -684,7 +737,10 @@ public class TextIO {
.addIfNotNull(DisplayData.item("fileHeader", header)
.withLabel("File Header"))
.addIfNotNull(DisplayData.item("fileFooter", footer)
- .withLabel("File Footer"));
+ .withLabel("File Footer"))
+ .add(DisplayData
+ .item("writableByteChannelFactory", writableByteChannelFactory.toString())
+ .withLabel("Compression/Transformation Type"));
}
/**
@@ -1018,8 +1074,9 @@ public class TextIO {
TextSink(
String baseOutputFilename, String extension,
@Nullable String header, @Nullable String footer,
- String fileNameTemplate, Coder<T> coder) {
- super(baseOutputFilename, extension, fileNameTemplate);
+ String fileNameTemplate, Coder<T> coder,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory);
this.coder = coder;
this.header = header;
this.footer = footer;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
new file mode 100644
index 0000000..79f0996
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
+import org.apache.beam.sdk.util.MimeTypes;
+
+/**
+ * {@link WritableByteChannelFactory} implementation useful for testing that creates a
+ * {@link WritableByteChannel} that writes everything twice.
+ */
+public class DrunkWritableByteChannelFactory implements WritableByteChannelFactory {
+ @Override
+ public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+ return new DrunkWritableByteChannel(channel);
+ }
+
+ @Override
+ public String getMimeType() {
+ return MimeTypes.TEXT;
+ }
+
+ @Override
+ public String getFilenameSuffix() {
+ return ".drunk";
+ }
+
+ @Override
+ public String toString() {
+ return "DRUNK";
+ }
+
+ /**
+ * WritableByteChannel that writes everything twice.
+ */
+ private static class DrunkWritableByteChannel implements WritableByteChannel {
+ protected final WritableByteChannel channel;
+
+ public DrunkWritableByteChannel(final WritableByteChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return channel.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ final int w1 = channel.write(src);
+ src.rewind();
+ final int w2 = channel.write(src);
+ return w1 + w2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 0fdb11f..66bb661 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -26,20 +26,32 @@ import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention;
+import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
+import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -419,6 +431,97 @@ public class FileBasedSinkTest {
}
/**
+ * {@link CompressionType#BZIP2} correctly writes Gzipped data.
+ */
+ @Test
+ public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException {
+ final File file =
+ writeValuesWithWritableByteChannelFactory(CompressionType.BZIP2, "abc", "123");
+ // Read Bzip2ed data back in using Apache commons API (de facto standard).
+ assertReadValues(new BufferedReader(new InputStreamReader(
+ new BZip2CompressorInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())),
+ "abc", "123");
+ }
+
+ /**
+ * {@link CompressionType#GZIP} correctly writes Gzipped data.
+ */
+ @Test
+ public void testCompressionTypeGZIP() throws FileNotFoundException, IOException {
+ final File file = writeValuesWithWritableByteChannelFactory(CompressionType.GZIP, "abc", "123");
+ // Read Gzipped data back in using standard API.
+ assertReadValues(new BufferedReader(new InputStreamReader(
+ new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc",
+ "123");
+ }
+
+ /**
+ * {@link CompressionType#GZIP} correctly writes Gzipped data.
+ */
+ @Test
+ public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException {
+ final File file =
+ writeValuesWithWritableByteChannelFactory(CompressionType.UNCOMPRESSED, "abc", "123");
+ // Read uncompressed data back in using standard API.
+ assertReadValues(new BufferedReader(new InputStreamReader(
+ new FileInputStream(file), StandardCharsets.UTF_8.name())), "abc",
+ "123");
+ }
+
+ private void assertReadValues(final BufferedReader br, String... values) throws IOException {
+ try (final BufferedReader _br = br) {
+ for (String value : values) {
+ assertEquals(String.format("Line should read '%s'", value), value, _br.readLine());
+ }
+ }
+ }
+
+ private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory,
+ String... values)
+ throws IOException, FileNotFoundException {
+ final File file = tmpFolder.newFile("test.gz");
+ final WritableByteChannel channel =
+ factory.create(Channels.newChannel(new FileOutputStream(file)));
+ for (String value : values) {
+ channel.write(ByteBuffer.wrap((value + "\n").getBytes(StandardCharsets.UTF_8)));
+ }
+ channel.close();
+ return file;
+ }
+
+ /**
+ * {@link FileBasedWriter} writes to the {@link WritableByteChannel} provided by
+ * {@link DrunkWritableByteChannelFactory}.
+ */
+ @Test
+ public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
+ final String testUid = "testId";
+ final String expectedFilename =
+ getBaseOutputFilename() + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + testUid;
+ final FileBasedWriter<String> writer =
+ new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory())
+ .createWriteOperation(null).createWriter(null);
+
+ final List<String> expected = new ArrayList<>();
+ expected.add("header");
+ expected.add("header");
+ expected.add("a");
+ expected.add("a");
+ expected.add("b");
+ expected.add("b");
+ expected.add("footer");
+ expected.add("footer");
+
+ writer.open(testUid);
+ writer.write("a");
+ writer.write("b");
+ final FileResult result = writer.close();
+
+ assertEquals(expectedFilename, result.getFilename());
+ assertFileContains(expected, expectedFilename);
+ }
+
+ /**
* A simple FileBasedSink that writes String values as lines with header and footer lines.
*/
private static final class SimpleSink extends FileBasedSink<String> {
@@ -426,6 +529,11 @@ public class FileBasedSinkTest {
super(baseOutputFilename, extension);
}
+ public SimpleSink(String baseOutputFilename, String extension,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ super(baseOutputFilename, extension, writableByteChannelFactory);
+ }
+
public SimpleSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
super(baseOutputFilename, extension, fileNamingTemplate);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index fdfb652..2131ece 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.TestUtils.INTS_ARRAY;
+import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY;
import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
@@ -47,6 +48,7 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
@@ -70,13 +72,16 @@ import java.util.Set;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
+
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.TextIO.CompressionType;
import org.apache.beam.sdk.io.TextIO.TextSource;
import org.apache.beam.sdk.options.GcsOptions;
@@ -170,7 +175,7 @@ public class TextIOTest {
@BeforeClass
public static void setupClass() throws IOException {
IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions());
- tempFolder = Files.createTempDirectory("TextIOTest");
+ tempFolder = Files.createTempDirectory("TextIOTest");
// empty files
emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED);
emptyGz = writeToFile(EMPTY, "empty.gz", GZIP);
@@ -261,7 +266,7 @@ public class TextIOTest {
@Test
@Category(NeedsRunner.class)
public void testReadNulls() throws Exception {
- runTestRead(new Void[]{null, null, null}, VoidCoder.of());
+ runTestRead(new Void[] {null, null, null}, VoidCoder.of());
}
@Test
@@ -342,6 +347,7 @@ public class TextIOTest {
} else if (numShards > 0) {
write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
}
+
input.apply(write);
p.run();
@@ -413,7 +419,7 @@ public class TextIOTest {
}
private static Function<List<String>, List<String>> removeHeaderAndFooter(final String header,
- final String footer) {
+ final String footer) {
return new Function<List<String>, List<String>>() {
@Nullable
@Override
@@ -498,6 +504,36 @@ public class TextIOTest {
}
@Test
+ @Category(NeedsRunner.class)
+ public void testWriteWithWritableByteChannelFactory() throws Exception {
+ Coder<String> coder = StringUtf8Coder.of();
+ String outputName = "file.txt";
+ Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder));
+
+ final WritableByteChannelFactory writableByteChannelFactory =
+ new DrunkWritableByteChannelFactory();
+ TextIO.Write.Bound<String> write = TextIO.Write.to(baseDir.resolve(outputName).toString())
+ .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory);
+ DisplayData displayData = DisplayData.from(write);
+ assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK"));
+
+ input.apply(write);
+
+ p.run();
+
+ final List<String> drunkElems = new ArrayList<>(LINES2_ARRAY.length * 2 + 2);
+ for (String elem : LINES2_ARRAY) {
+ drunkElems.add(elem + elem);
+ drunkElems.add("");
+ }
+ assertOutputFiles(drunkElems.toArray(new String[0]), null, null, coder, 1, baseDir,
+ outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardNameTemplate());
+ }
+
+ @Test
public void testWriteDisplayData() {
TextIO.Write.Bound<?> write = TextIO.Write
.to("foo")
@@ -517,6 +553,7 @@ public class TextIOTest {
assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
assertThat(displayData, hasDisplayItem("numShards", 100));
assertThat(displayData, hasDisplayItem("validation", false));
+ assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "UNCOMPRESSED"));
}
@Test
@@ -638,9 +675,9 @@ public class TextIOTest {
}
/**
- * Tests reading from a small, uncompressed file with .gz extension.
- * This must work in AUTO or GZIP modes. This is needed because some network file systems / HTTP
- * clients will transparently decompress gzipped content.
+ * Tests reading from a small, uncompressed file with .gz extension. This must work in AUTO or
+ * GZIP modes. This is needed because some network file systems / HTTP clients will transparently
+ * decompress gzipped content.
*/
@Test
@Category(NeedsRunner.class)
@@ -672,9 +709,7 @@ public class TextIOTest {
* @return The zip filename.
* @throws Exception In case of a failure during zip file creation.
*/
- private String createZipFile(List<String> expected, String filename, String[]
- ...
- fieldsEntries)
+ private String createZipFile(List<String> expected, String filename, String[]... fieldsEntries)
throws Exception {
File tmpFile = tempFolder.resolve(filename).toFile();
String tmpFileName = tmpFile.getPath();
@@ -703,7 +738,7 @@ public class TextIOTest {
@Category(NeedsRunner.class)
public void testTxtRead() throws Exception {
// Files with non-compressed extensions should work in AUTO and UNCOMPRESSED modes.
- for (CompressionType type : new CompressionType[] { AUTO, UNCOMPRESSED }) {
+ for (CompressionType type : new CompressionType[]{AUTO, UNCOMPRESSED}) {
assertReadingCompressedFileMatchesExpected(emptyTxt, type, EMPTY);
assertReadingCompressedFileMatchesExpected(tinyTxt, type, TINY);
assertReadingCompressedFileMatchesExpected(largeTxt, type, LARGE);
@@ -714,7 +749,7 @@ public class TextIOTest {
@Category(NeedsRunner.class)
public void testGzipCompressedRead() throws Exception {
// Files with the right extensions should work in AUTO and GZIP modes.
- for (CompressionType type : new CompressionType[] { AUTO, GZIP }) {
+ for (CompressionType type : new CompressionType[]{AUTO, GZIP}) {
assertReadingCompressedFileMatchesExpected(emptyGz, type, EMPTY);
assertReadingCompressedFileMatchesExpected(tinyGz, type, TINY);
assertReadingCompressedFileMatchesExpected(largeGz, type, LARGE);
@@ -732,7 +767,7 @@ public class TextIOTest {
@Category(NeedsRunner.class)
public void testBzip2CompressedRead() throws Exception {
// Files with the right extensions should work in AUTO and BZIP2 modes.
- for (CompressionType type : new CompressionType[] { AUTO, BZIP2 }) {
+ for (CompressionType type : new CompressionType[]{AUTO, BZIP2}) {
assertReadingCompressedFileMatchesExpected(emptyBzip2, type, EMPTY);
assertReadingCompressedFileMatchesExpected(tinyBzip2, type, TINY);
assertReadingCompressedFileMatchesExpected(largeBzip2, type, LARGE);
[2/2] incubator-beam git commit: Closes #1027
Posted by dh...@apache.org.
Closes #1027
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b7b68e6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b7b68e6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b7b68e6f
Branch: refs/heads/master
Commit: b7b68e6fb1aafb6b4160e5dcea022bf6c802e33f
Parents: 49f9444 fa589ee
Author: Dan Halperin <dh...@google.com>
Authored: Fri Oct 14 10:15:36 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 14 10:15:36 2016 -0700
----------------------------------------------------------------------
.gitignore | 5 +
.../org/apache/beam/sdk/io/FileBasedSink.java | 138 ++++++++++++++++++-
.../java/org/apache/beam/sdk/io/TextIO.java | 91 +++++++++---
.../sdk/io/DrunkWritableByteChannelFactory.java | 80 +++++++++++
.../apache/beam/sdk/io/FileBasedSinkTest.java | 108 +++++++++++++++
.../java/org/apache/beam/sdk/io/TextIOTest.java | 59 ++++++--
6 files changed, 447 insertions(+), 34 deletions(-)
----------------------------------------------------------------------