You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/31 00:47:40 UTC
[2/3] beam git commit: Adds a canonical Compression enum for
file-based IOs
Adds a canonical Compression enum for file-based IOs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54489f0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54489f0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54489f0d
Branch: refs/heads/master
Commit: 54489f0d52e354d8233bf297cce6ce451a05f6a5
Parents: afe8b0e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Aug 18 16:17:20 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 17:40:52 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroSink.java | 2 +-
.../apache/beam/sdk/io/CompressedSource.java | 292 ++++++-------------
.../org/apache/beam/sdk/io/Compression.java | 228 +++++++++++++++
.../org/apache/beam/sdk/io/FileBasedSink.java | 113 +++----
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 153 ++++------
.../java/org/apache/beam/sdk/io/TextIO.java | 178 +++++------
.../beam/sdk/io/CompressedSourceTest.java | 17 +-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 41 ++-
.../java/org/apache/beam/sdk/io/SimpleSink.java | 23 +-
.../org/apache/beam/sdk/io/TFRecordIOTest.java | 35 ++-
.../org/apache/beam/sdk/io/TextIOReadTest.java | 81 +++--
.../org/apache/beam/sdk/io/WriteFilesTest.java | 9 +-
.../java/org/apache/beam/sdk/io/xml/XmlIO.java | 96 +++---
13 files changed, 672 insertions(+), 596 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index acd3ea6..888db85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -40,7 +40,7 @@ class AvroSink<UserT, DestinationT, OutputT> extends FileBasedSink<UserT, Destin
DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations,
boolean genericRecords) {
// Avro handle compression internally using the codec.
- super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED);
+ super(outputPrefix, dynamicDestinations, Compression.UNCOMPRESSED);
this.dynamicDestinations = dynamicDestinations;
this.genericRecords = genericRecords;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 6943a02..ae55d80 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -20,28 +20,17 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.Ints;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.PushbackInputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.NoSuchElementException;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
-import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.joda.time.Instant;
/**
@@ -54,21 +43,20 @@ import org.joda.time.Instant;
* FileBasedSource<T> mySource = ...;
* PCollection<T> collection = p.apply(Read.from(CompressedSource
* .from(mySource)
- * .withDecompression(CompressedSource.CompressionMode.GZIP)));
+ * .withCompression(Compression.GZIP)));
* } </pre>
*
- * <p>Supported compression algorithms are {@link CompressionMode#GZIP},
- * {@link CompressionMode#BZIP2}, {@link CompressionMode#ZIP} and {@link CompressionMode#DEFLATE}.
- * User-defined compression types are supported by implementing
+ * <p>Supported compression algorithms are {@link Compression#GZIP},
+ * {@link Compression#BZIP2}, {@link Compression#ZIP} and {@link Compression#DEFLATE}.
+ * User-defined compression types are supported by implementing a
* {@link DecompressingChannelFactory}.
*
* <p>By default, the compression algorithm is selected from those supported in
- * {@link CompressionMode} based on the file name provided to the source, namely
- * {@code ".bz2"} indicates {@link CompressionMode#BZIP2}, {@code ".gz"} indicates
- * {@link CompressionMode#GZIP}, {@code ".zip"} indicates {@link CompressionMode#ZIP} and
- * {@code ".deflate"} indicates {@link CompressionMode#DEFLATE}. If the file name does not match
- * any of the supported
- * algorithms, it is assumed to be uncompressed data.
+ * {@link Compression} based on the file name provided to the source, namely
+ * {@code ".bz2"} indicates {@link Compression#BZIP2}, {@code ".gz"} indicates
+ * {@link Compression#GZIP}, {@code ".zip"} indicates {@link Compression#ZIP} and
+ * {@code ".deflate"} indicates {@link Compression#DEFLATE}. If the file name does not match
+ * any of the supported algorithms, it is assumed to be uncompressed data.
*
* @param <T> The type to read from the compressed file.
*/
@@ -85,197 +73,75 @@ public class CompressedSource<T> extends FileBasedSource<T> {
throws IOException;
}
- /**
- * Factory interface for creating channels that decompress the content of an underlying channel,
- * based on both the channel and the file name.
- */
- private interface FileNameBasedDecompressingChannelFactory
- extends DecompressingChannelFactory {
- /**
- * Given a channel, create a channel that decompresses the content read from the channel.
- */
- ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel)
- throws IOException;
- }
-
- /**
- * Default compression types supported by the {@code CompressedSource}.
- */
+ /** @deprecated Use {@link Compression} instead */
+ @Deprecated
public enum CompressionMode implements DecompressingChannelFactory {
- /**
- * Reads a byte channel assuming it is compressed with gzip.
- */
- GZIP {
- @Override
- public boolean matches(String fileName) {
- return fileName.toLowerCase().endsWith(".gz");
- }
-
- @Override
- public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
- throws IOException {
- // Determine if the input stream is gzipped. The input stream returned from the
- // GCS connector may already be decompressed; GCS does this based on the
- // content-encoding property.
- PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2);
- byte[] headerBytes = new byte[2];
- int bytesRead = ByteStreams.read(
- stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */);
- stream.unread(headerBytes, 0, bytesRead);
- if (bytesRead >= 2) {
- byte zero = 0x00;
- int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
- if (header == GZIPInputStream.GZIP_MAGIC) {
- return Channels.newChannel(new GzipCompressorInputStream(stream, true));
- }
- }
- return Channels.newChannel(stream);
- }
- },
-
- /**
- * Reads a byte channel assuming it is compressed with bzip2.
- */
- BZIP2 {
- @Override
- public boolean matches(String fileName) {
- return fileName.toLowerCase().endsWith(".bz2");
- }
+ /** @see Compression#UNCOMPRESSED */
+ UNCOMPRESSED(Compression.UNCOMPRESSED),
- @Override
- public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
- throws IOException {
- return Channels.newChannel(
- new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
- }
- },
+ /** @see Compression#AUTO */
+ AUTO(Compression.AUTO),
- /**
- * Reads a byte channel assuming it is compressed with zip.
- * If the zip file contains multiple entries, files in the zip are concatenated all together.
- */
- ZIP {
- @Override
- public boolean matches(String fileName) {
- return fileName.toLowerCase().endsWith(".zip");
- }
+ /** @see Compression#GZIP */
+ GZIP(Compression.GZIP),
- public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
- throws IOException {
- FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel));
- return Channels.newChannel(zip);
- }
- },
+ /** @see Compression#BZIP2 */
+ BZIP2(Compression.BZIP2),
- /**
- * Reads a byte channel assuming it is compressed with deflate.
- */
- DEFLATE {
- @Override
- public boolean matches(String fileName) {
- return fileName.toLowerCase().endsWith(".deflate");
- }
-
- public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
- throws IOException {
- return Channels.newChannel(
- new DeflateCompressorInputStream(Channels.newInputStream(channel)));
- }
- };
+ /** @see Compression#ZIP */
+ ZIP(Compression.ZIP),
- /**
- * Extend of {@link ZipInputStream} to automatically read all entries in the zip.
- */
- private static class FullZipInputStream extends InputStream {
+ /** @see Compression#DEFLATE */
+ DEFLATE(Compression.DEFLATE);
- private ZipInputStream zipInputStream;
- private ZipEntry currentEntry;
-
- public FullZipInputStream(InputStream is) throws IOException {
- super();
- zipInputStream = new ZipInputStream(is);
- currentEntry = zipInputStream.getNextEntry();
- }
-
- @Override
- public int read() throws IOException {
- int result = zipInputStream.read();
- while (result == -1) {
- currentEntry = zipInputStream.getNextEntry();
- if (currentEntry == null) {
- return -1;
- } else {
- result = zipInputStream.read();
- }
- }
- return result;
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- int result = zipInputStream.read(b, off, len);
- while (result == -1) {
- currentEntry = zipInputStream.getNextEntry();
- if (currentEntry == null) {
- return -1;
- } else {
- result = zipInputStream.read(b, off, len);
- }
- }
- return result;
- }
+ private Compression canonical;
+ CompressionMode(Compression canonical) {
+ this.canonical = canonical;
}
/**
* Returns {@code true} if the given file name implies that the contents are compressed
* according to the compression embodied by this factory.
*/
- public abstract boolean matches(String fileName);
+ public boolean matches(String fileName) {
+ return canonical.matches(fileName);
+ }
@Override
- public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
- throws IOException;
+ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
+ throws IOException {
+ return canonical.readDecompressed(channel);
+ }
/** Returns whether the file's extension matches of one of the known compression formats. */
public static boolean isCompressed(String filename) {
- for (CompressionMode type : CompressionMode.values()) {
- if (type.matches(filename)) {
- return true;
- }
- }
- return false;
+ return Compression.AUTO.isCompressed(filename);
}
- }
- /**
- * Reads a byte channel detecting compression according to the file name. If the filename
- * is not any other known {@link CompressionMode}, it is presumed to be uncompressed.
- */
- private static class DecompressAccordingToFilename
- implements FileNameBasedDecompressingChannelFactory {
+ static DecompressingChannelFactory fromCanonical(Compression compression) {
+ switch (compression) {
+ case AUTO:
+ return AUTO;
- @Override
- public ReadableByteChannel createDecompressingChannel(
- String fileName, ReadableByteChannel channel) throws IOException {
- for (CompressionMode type : CompressionMode.values()) {
- if (type.matches(fileName)) {
- return type.createDecompressingChannel(channel);
- }
- }
- // Uncompressed
- return channel;
- }
+ case UNCOMPRESSED:
+ return UNCOMPRESSED;
- @Override
- public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) {
- throw new UnsupportedOperationException(
- String.format("%s does not support createDecompressingChannel(%s) but only"
- + " createDecompressingChannel(%s,%s)",
- getClass().getSimpleName(),
- String.class.getSimpleName(),
- ReadableByteChannel.class.getSimpleName(),
- ReadableByteChannel.class.getSimpleName()));
+ case GZIP:
+ return GZIP;
+
+ case BZIP2:
+ return BZIP2;
+
+ case ZIP:
+ return ZIP;
+
+ case DEFLATE:
+ return DEFLATE;
+
+ default:
+ throw new IllegalArgumentException("Unsupported compression type: " + compression);
+ }
}
}
@@ -288,7 +154,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
* configured via {@link CompressedSource#withDecompression}.
*/
public static <T> CompressedSource<T> from(FileBasedSource<T> sourceDelegate) {
- return new CompressedSource<>(sourceDelegate, new DecompressAccordingToFilename());
+ return new CompressedSource<>(sourceDelegate, CompressionMode.AUTO);
}
/**
@@ -299,6 +165,11 @@ public class CompressedSource<T> extends FileBasedSource<T> {
return new CompressedSource<>(this.sourceDelegate, channelFactory);
}
+ /** Like {@link #withDecompression} but takes a canonical {@link Compression}. */
+ public CompressedSource<T> withCompression(Compression compression) {
+ return withDecompression(CompressionMode.fromCanonical(compression));
+ }
+
/**
* Creates a {@code CompressedSource} from a delegate file based source and a decompressing
* channel factory.
@@ -359,10 +230,21 @@ public class CompressedSource<T> extends FileBasedSource<T> {
* from the requested file name that the file is not compressed.
*/
@Override
- protected final boolean isSplittable() throws Exception {
- return channelFactory instanceof FileNameBasedDecompressingChannelFactory
- && !CompressionMode.isCompressed(getFileOrPatternSpec())
- && sourceDelegate.isSplittable();
+ protected final boolean isSplittable() {
+ try {
+ if (!sourceDelegate.isSplittable()) {
+ return false;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (channelFactory == CompressionMode.UNCOMPRESSED) {
+ return true;
+ }
+ if (channelFactory == CompressionMode.AUTO) {
+ return !Compression.AUTO.isCompressed(getFileOrPatternSpec());
+ }
+ return false;
}
/**
@@ -375,10 +257,8 @@ public class CompressedSource<T> extends FileBasedSource<T> {
*/
@Override
protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
- if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
- if (!CompressionMode.isCompressed(getFileOrPatternSpec())) {
- return sourceDelegate.createSingleFileReader(options);
- }
+ if (isSplittable()) {
+ return sourceDelegate.createSingleFileReader(options);
}
return new CompressedReader<T>(
this, sourceDelegate.createSingleFileReader(options));
@@ -423,19 +303,19 @@ public class CompressedSource<T> extends FileBasedSource<T> {
public static class CompressedReader<T> extends FileBasedReader<T> {
private final FileBasedReader<T> readerDelegate;
- private final CompressedSource<T> source;
private final Object progressLock = new Object();
@GuardedBy("progressLock")
private int numRecordsRead;
@GuardedBy("progressLock")
private CountingChannel channel;
+ private DecompressingChannelFactory channelFactory;
/**
* Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader.
*/
public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) {
super(source);
- this.source = source;
+ this.channelFactory = source.getChannelFactory();
this.readerDelegate = readerDelegate;
}
@@ -525,14 +405,12 @@ public class CompressedSource<T> extends FileBasedSource<T> {
channel = this.channel;
}
- if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) {
- FileNameBasedDecompressingChannelFactory channelFactory =
- (FileNameBasedDecompressingChannelFactory) source.getChannelFactory();
- readerDelegate.startReading(channelFactory.createDecompressingChannel(
- getCurrentSource().getFileOrPatternSpec(),
- channel));
+ if (channelFactory == CompressionMode.AUTO) {
+ readerDelegate.startReading(
+ Compression.detect(getCurrentSource().getFileOrPatternSpec())
+ .readDecompressed(channel));
} else {
- readerDelegate.startReading(source.getChannelFactory().createDecompressingChannel(
+ readerDelegate.startReading(channelFactory.createDecompressingChannel(
channel));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java
new file mode 100644
index 0000000..bb40ed4
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+
+/** Various compression types for reading/writing files. */
+public enum Compression {
+ /**
+ * When reading a file, automatically determine the compression type based on filename extension.
+ * Not applicable when writing files.
+ */
+ AUTO("") {
+ @Override
+ public ReadableByteChannel readDecompressed(ReadableByteChannel channel) {
+ throw new UnsupportedOperationException(
+ "Must resolve compression into a concrete value before calling readDecompressed()");
+ }
+
+ @Override
+ public WritableByteChannel writeCompressed(WritableByteChannel channel) {
+ throw new UnsupportedOperationException("AUTO is applicable only to reading files");
+ }
+ },
+
+ /** No compression. */
+ UNCOMPRESSED("") {
+ @Override
+ public ReadableByteChannel readDecompressed(ReadableByteChannel channel) {
+ return channel;
+ }
+
+ @Override
+ public WritableByteChannel writeCompressed(WritableByteChannel channel) {
+ return channel;
+ }
+ },
+
+ /** GZip compression. */
+ GZIP(".gz", ".gz") {
+ @Override
+ public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
+ // Determine if the input stream is gzipped. The input stream returned from the
+ // GCS connector may already be decompressed; GCS does this based on the
+ // content-encoding property.
+ PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2);
+ byte[] headerBytes = new byte[2];
+ int bytesRead =
+ ByteStreams.read(
+ stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */);
+ stream.unread(headerBytes, 0, bytesRead);
+ if (bytesRead >= 2) {
+ byte zero = 0x00;
+ int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
+ if (header == GZIPInputStream.GZIP_MAGIC) {
+ return Channels.newChannel(new GzipCompressorInputStream(stream, true));
+ }
+ }
+ return Channels.newChannel(stream);
+ }
+
+ @Override
+ public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
+ return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true));
+ }
+ },
+
+ /** BZip compression. */
+ BZIP2(".bz2", ".bz2") {
+ @Override
+ public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
+ return Channels.newChannel(
+ new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
+ }
+
+ @Override
+ public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
+ return Channels.newChannel(
+ new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
+ }
+ },
+
+ /** Zip compression. */
+ ZIP(".zip", ".zip") {
+ @Override
+ public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
+ FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel));
+ return Channels.newChannel(zip);
+ }
+
+ @Override
+ public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
+ throw new UnsupportedOperationException("Writing ZIP files is currently unsupported");
+ }
+ },
+
+ /** Deflate compression. */
+ DEFLATE(".deflate", ".deflate", ".zlib") {
+ @Override
+ public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
+ return Channels.newChannel(
+ new DeflateCompressorInputStream(Channels.newInputStream(channel)));
+ }
+
+ @Override
+ public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
+ return Channels.newChannel(
+ new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
+ }
+ };
+
+ private final String suggestedSuffix;
+ private final List<String> detectedSuffixes;
+
+ Compression(String suggestedSuffix, String... detectedSuffixes) {
+ this.suggestedSuffix = suggestedSuffix;
+ this.detectedSuffixes = Arrays.asList(detectedSuffixes);
+ }
+
+ public String getSuggestedSuffix() {
+ return suggestedSuffix;
+ }
+
+ public boolean matches(String filename) {
+ for (String suffix : detectedSuffixes) {
+ if (filename.toLowerCase().endsWith(suffix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean isCompressed(String filename) {
+ Compression compression = this;
+ if (compression == AUTO) {
+ compression = detect(filename);
+ }
+ return compression != UNCOMPRESSED;
+ }
+
+ public static Compression detect(String filename) {
+ for (Compression value : values()) {
+ if (value.matches(filename)) {
+ return value;
+ }
+ }
+ return UNCOMPRESSED;
+ }
+
+ public abstract ReadableByteChannel readDecompressed(ReadableByteChannel channel)
+ throws IOException;
+
+ public abstract WritableByteChannel writeCompressed(WritableByteChannel channel)
+ throws IOException;
+
+ /** Concatenates all {@link ZipInputStream}s contained within the zip file. */
+ private static class FullZipInputStream extends InputStream {
+ private ZipInputStream zipInputStream;
+ private ZipEntry currentEntry;
+
+ public FullZipInputStream(InputStream is) throws IOException {
+ super();
+ zipInputStream = new ZipInputStream(is);
+ currentEntry = zipInputStream.getNextEntry();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = zipInputStream.read();
+ while (result == -1) {
+ currentEntry = zipInputStream.getNextEntry();
+ if (currentEntry == null) {
+ return -1;
+ } else {
+ result = zipInputStream.read();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int result = zipInputStream.read(b, off, len);
+ while (result == -1) {
+ currentEntry = zipInputStream.getNextEntry();
+ if (currentEntry == null) {
+ return -1;
+ } else {
+ result = zipInputStream.read(b, off, len);
+ }
+ }
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index d618647..39f7868 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -36,7 +36,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
@@ -47,7 +46,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -79,8 +77,6 @@ import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
-import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -128,56 +124,66 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
- /** Directly supported file output compression types. */
+ /** @deprecated use {@link Compression}. */
+ @Deprecated
public enum CompressionType implements WritableByteChannelFactory {
- /** No compression, or any other transformation, will be used. */
- UNCOMPRESSED("", null) {
- @Override
- public WritableByteChannel create(WritableByteChannel channel) throws IOException {
- return channel;
- }
- },
- /** Provides GZip output transformation. */
- GZIP(".gz", MimeTypes.BINARY) {
- @Override
- public WritableByteChannel create(WritableByteChannel channel) throws IOException {
- return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true));
- }
- },
- /** Provides BZip2 output transformation. */
- BZIP2(".bz2", MimeTypes.BINARY) {
- @Override
- public WritableByteChannel create(WritableByteChannel channel) throws IOException {
- return Channels.newChannel(
- new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
- }
- },
- /** Provides deflate output transformation. */
- DEFLATE(".deflate", MimeTypes.BINARY) {
- @Override
- public WritableByteChannel create(WritableByteChannel channel) throws IOException {
- return Channels.newChannel(
- new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
- }
- };
+ /** @see Compression#UNCOMPRESSED */
+ UNCOMPRESSED(Compression.UNCOMPRESSED),
- private String filenameSuffix;
- @Nullable private String mimeType;
+ /** @see Compression#GZIP */
+ GZIP(Compression.GZIP),
- CompressionType(String suffix, @Nullable String mimeType) {
- this.filenameSuffix = suffix;
- this.mimeType = mimeType;
+ /** @see Compression#BZIP2 */
+ BZIP2(Compression.BZIP2),
+
+ /** @see Compression#DEFLATE */
+ DEFLATE(Compression.DEFLATE);
+
+ private Compression canonical;
+
+ CompressionType(Compression canonical) {
+ this.canonical = canonical;
}
@Override
public String getSuggestedFilenameSuffix() {
- return filenameSuffix;
+ return canonical.getSuggestedSuffix();
}
@Override
@Nullable
public String getMimeType() {
- return mimeType;
+ return (canonical == Compression.UNCOMPRESSED) ? null : MimeTypes.BINARY;
+ }
+
+ @Override
+ public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+ return canonical.writeCompressed(channel);
+ }
+
+ public static CompressionType fromCanonical(Compression canonical) {
+ switch(canonical) {
+ case AUTO:
+ throw new IllegalArgumentException("AUTO is not supported for writing");
+
+ case UNCOMPRESSED:
+ return UNCOMPRESSED;
+
+ case GZIP:
+ return GZIP;
+
+ case BZIP2:
+ return BZIP2;
+
+ case ZIP:
+ throw new IllegalArgumentException("ZIP is unsupported");
+
+ case DEFLATE:
+ return DEFLATE;
+
+ default:
+ throw new UnsupportedOperationException("Unsupported compression type: " + canonical);
+ }
}
}
@@ -208,7 +214,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
/**
* The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
* underlying channel. The default is to not compress the output using {@link
- * CompressionType#UNCOMPRESSED}.
+ * Compression#UNCOMPRESSED}.
*/
private final WritableByteChannelFactory writableByteChannelFactory;
@@ -328,7 +334,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
* When a sink has requested windowed or triggered output, this method will be invoked to return
* the file {@link ResourceId resource} to be created given the base output directory and a
* {@link OutputFileHints} containing information about the file, including a suggested
- * extension (e.g. coming from {@link CompressionType}).
+ * extension (e.g. coming from {@link Compression}).
*
* <p>The policy must return unique and consistent filenames for different windows and panes.
*/
@@ -344,7 +350,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
* When a sink has not requested windowed or triggered output, this method will be invoked to
* return the file {@link ResourceId resource} to be created given the base output directory and
* a {@link OutputFileHints} containing information about the file, including a suggested (e.g.
- * coming from {@link CompressionType}).
+ * coming from {@link Compression}).
*
* <p>The shardNumber and numShards parameters, should be used by the policy to generate unique
* and consistent filenames.
@@ -375,7 +381,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
public FileBasedSink(
ValueProvider<ResourceId> tempDirectoryProvider,
DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations) {
- this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED);
+ this(tempDirectoryProvider, dynamicDestinations, Compression.UNCOMPRESSED);
}
/** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
@@ -390,6 +396,15 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
this.writableByteChannelFactory = writableByteChannelFactory;
}
+ /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
+ @Experimental(Kind.FILESYSTEM)
+ public FileBasedSink(
+ ValueProvider<ResourceId> tempDirectoryProvider,
+ DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations,
+ Compression compression) {
+ this(tempDirectoryProvider, dynamicDestinations, CompressionType.fromCanonical(compression));
+ }
+
/** Return the {@link DynamicDestinations} used. */
@SuppressWarnings("unchecked")
public DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
@@ -799,7 +814,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
*
* <p>This is the default for the sink, but it may be overridden by a supplied {@link
* WritableByteChannelFactory}. For example, {@link TextIO.Write} uses {@link MimeTypes#TEXT} by
- * default but if {@link CompressionType#BZIP2} is set then the MIME type will be overridden to
+ * default but if {@link Compression#BZIP2} is set then the MIME type will be overridden to
* {@link MimeTypes#BINARY}.
*/
private final String mimeType;
@@ -1134,7 +1149,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
/**
* Returns the MIME type that should be used for the files that will hold the output data. May
* return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change
- * the MIME type (e.g., for {@link CompressionType#UNCOMPRESSED}).
+ * the MIME type (e.g., for {@link Compression#UNCOMPRESSED}).
*
* @see MimeTypes
* @see <a href=
@@ -1144,7 +1159,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
String getMimeType();
/**
- * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP}
+ * @return an optional filename suffix, eg, ".gz" is returned for {@link Compression#GZIP}
*/
@Nullable
String getSuggestedFilenameSuffix();
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 526c50e..ddedd00 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -42,7 +42,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PBegin;
@@ -64,7 +63,7 @@ public class TFRecordIO {
public static Read read() {
return new AutoValue_TFRecordIO_Read.Builder()
.setValidate(true)
- .setCompressionType(CompressionType.AUTO)
+ .setCompression(Compression.AUTO)
.build();
}
@@ -78,7 +77,7 @@ public class TFRecordIO {
.setShardTemplate(null)
.setFilenameSuffix(null)
.setNumShards(0)
- .setCompressionType(CompressionType.NONE)
+ .setCompression(Compression.UNCOMPRESSED)
.build();
}
@@ -90,7 +89,7 @@ public class TFRecordIO {
abstract boolean getValidate();
- abstract CompressionType getCompressionType();
+ abstract Compression getCompression();
abstract Builder toBuilder();
@@ -98,7 +97,7 @@ public class TFRecordIO {
abstract static class Builder {
abstract Builder setFilepattern(ValueProvider<String> filepattern);
abstract Builder setValidate(boolean validate);
- abstract Builder setCompressionType(CompressionType compressionType);
+ abstract Builder setCompression(Compression compression);
abstract Read build();
}
@@ -134,18 +133,22 @@ public class TFRecordIO {
return toBuilder().setValidate(false).build();
}
+ /** @deprecated Use {@link #withCompression}. */
+ @Deprecated
+ public Read withCompressionType(TFRecordIO.CompressionType compressionType) {
+ return withCompression(compressionType.canonical);
+ }
+
/**
- * Returns a transform for reading TFRecord files that decompresses all input files
- * using the specified compression type.
+ * Returns a transform for reading TFRecord files that decompresses all input files using the
+ * specified compression type.
*
- * <p>If no compression type is specified, the default is
- * {@link TFRecordIO.CompressionType#AUTO}.
- * In this mode, the compression type of the file is determined by its extension
- * (e.g., {@code *.gz} is gzipped, {@code *.zlib} is zlib compressed, and all other
- * extensions are uncompressed).
+ * <p>If no compression type is specified, the default is {@link Compression#AUTO}. In this
+ * mode, the compression type of the file is determined by its extension via {@link
+ * Compression#detect(String)}.
*/
- public Read withCompressionType(TFRecordIO.CompressionType compressionType) {
- return toBuilder().setCompressionType(compressionType).build();
+ public Read withCompression(Compression compression) {
+ return toBuilder().setCompression(compression).build();
}
@Override
@@ -174,29 +177,15 @@ public class TFRecordIO {
// Helper to create a source specific to the requested compression type.
protected FileBasedSource<byte[]> getSource() {
- switch (getCompressionType()) {
- case NONE:
- return new TFRecordSource(getFilepattern());
- case AUTO:
- return CompressedSource.from(new TFRecordSource(getFilepattern()));
- case GZIP:
- return
- CompressedSource.from(new TFRecordSource(getFilepattern()))
- .withDecompression(CompressedSource.CompressionMode.GZIP);
- case ZLIB:
- return
- CompressedSource.from(new TFRecordSource(getFilepattern()))
- .withDecompression(CompressedSource.CompressionMode.DEFLATE);
- default:
- throw new IllegalArgumentException("Unknown compression type: " + getCompressionType());
- }
+ return CompressedSource.from(new TFRecordSource(getFilepattern()))
+ .withCompression(getCompression());
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
- .add(DisplayData.item("compressionType", getCompressionType().toString())
+ .add(DisplayData.item("compressionType", getCompression().toString())
.withLabel("Compression Type"))
.addIfNotDefault(DisplayData.item("validation", getValidate())
.withLabel("Validation Enabled"), true)
@@ -223,7 +212,7 @@ public class TFRecordIO {
@Nullable abstract String getShardTemplate();
/** Option to indicate the output sink's compression type. Default is NONE. */
- abstract CompressionType getCompressionType();
+ abstract Compression getCompression();
abstract Builder toBuilder();
@@ -237,7 +226,7 @@ public class TFRecordIO {
abstract Builder setNumShards(int numShards);
- abstract Builder setCompressionType(CompressionType compressionType);
+ abstract Builder setCompression(Compression compression);
abstract Write build();
}
@@ -326,15 +315,20 @@ public class TFRecordIO {
return withNumShards(1).withShardNameTemplate("");
}
+ /** @deprecated use {@link #withCompression}. */
+ @Deprecated
+ public Write withCompressionType(CompressionType compressionType) {
+ return withCompression(compressionType.canonical);
+ }
+
/**
* Writes to output files using the specified compression type.
*
- * <p>If no compression type is specified, the default is
- * {@link TFRecordIO.CompressionType#NONE}.
- * See {@link TFRecordIO.Read#withCompressionType} for more details.
+ * <p>If no compression type is specified, the default is {@link Compression#UNCOMPRESSED}. See
+ * {@link TFRecordIO.Read#withCompression} for more details.
*/
- public Write withCompressionType(CompressionType compressionType) {
- return toBuilder().setCompressionType(compressionType).build();
+ public Write withCompression(Compression compression) {
+ return toBuilder().setCompression(compression).build();
}
@Override
@@ -347,7 +341,7 @@ public class TFRecordIO {
getOutputPrefix(),
getShardTemplate(),
getFilenameSuffix(),
- getCompressionType()));
+ getCompression()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -366,45 +360,35 @@ public class TFRecordIO {
.withLabel("Output Shard Name Template"))
.addIfNotDefault(DisplayData.item("numShards", getNumShards())
.withLabel("Maximum Output Shards"), 0)
- .add(DisplayData.item("compressionType", getCompressionType().toString())
+ .add(DisplayData.item("compressionType", getCompression().toString())
.withLabel("Compression Type"));
}
}
- /**
- * Possible TFRecord file compression types.
- */
+ /** @deprecated Use {@link Compression}. */
+ @Deprecated
public enum CompressionType {
- /**
- * Automatically determine the compression type based on filename extension.
- */
- AUTO(""),
- /**
- * Uncompressed.
- */
- NONE(""),
- /**
- * GZipped.
- */
- GZIP(".gz"),
- /**
- * ZLIB compressed.
- */
- ZLIB(".zlib");
+ /** @see Compression#AUTO */
+ AUTO(Compression.AUTO),
+
+ /** @see Compression#UNCOMPRESSED */
+ NONE(Compression.UNCOMPRESSED),
+
+ /** @see Compression#GZIP */
+ GZIP(Compression.GZIP),
+
+ /** @see Compression#DEFLATE */
+ ZLIB(Compression.DEFLATE);
- private String filenameSuffix;
+ private Compression canonical;
- CompressionType(String suffix) {
- this.filenameSuffix = suffix;
+ CompressionType(Compression canonical) {
+ this.canonical = canonical;
}
- /**
- * Determine if a given filename matches a compression type based on its extension.
- * @param filename the filename to match
- * @return true iff the filename ends with the compression type's known extension.
- */
+ /** @see Compression#matches */
public boolean matches(String filename) {
- return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
+ return canonical.matches(filename);
}
}
@@ -419,11 +403,6 @@ public class TFRecordIO {
@VisibleForTesting
static class TFRecordSource extends FileBasedSource<byte[]> {
@VisibleForTesting
- TFRecordSource(String fileSpec) {
- super(StaticValueProvider.of(fileSpec), 1L);
- }
-
- @VisibleForTesting
TFRecordSource(ValueProvider<String> fileSpec) {
super(fileSpec, Long.MAX_VALUE);
}
@@ -452,7 +431,7 @@ public class TFRecordIO {
}
@Override
- protected boolean isSplittable() throws Exception {
+ protected boolean isSplittable() {
// TFRecord files are not splittable
return false;
}
@@ -528,20 +507,13 @@ public class TFRecordIO {
ValueProvider<ResourceId> outputPrefix,
@Nullable String shardTemplate,
@Nullable String suffix,
- TFRecordIO.CompressionType compressionType) {
+ Compression compression) {
super(
outputPrefix,
DynamicFileDestinations.<byte[]>constant(
DefaultFilenamePolicy.fromStandardParameters(
outputPrefix, shardTemplate, suffix, false)),
- writableByteChannelFactory(compressionType));
- }
-
- private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
- @Override
- public ResourceId apply(ResourceId input) {
- return input.getCurrentDirectory();
- }
+ compression);
}
@Override
@@ -549,21 +521,6 @@ public class TFRecordIO {
return new TFRecordWriteOperation(this);
}
- private static WritableByteChannelFactory writableByteChannelFactory(
- TFRecordIO.CompressionType compressionType) {
- switch (compressionType) {
- case AUTO:
- throw new IllegalArgumentException("Unsupported compression type AUTO");
- case NONE:
- return CompressionType.UNCOMPRESSED;
- case GZIP:
- return CompressionType.GZIP;
- case ZLIB:
- return CompressionType.DEFLATE;
- }
- return CompressionType.UNCOMPRESSED;
- }
-
/** A {@link WriteOperation WriteOperation} for TFRecord files. */
private static class TFRecordWriteOperation extends WriteOperation<Void, byte[]> {
private TFRecordWriteOperation(TFRecordSink sink) {
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 442e4d9..76102cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -32,7 +32,6 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
@@ -135,7 +134,7 @@ import org.joda.time.Duration;
* PCollection<String> lines = ...;
* lines.apply(TextIO.write().to("/path/to/file.txt"))
* .withSuffix(".txt")
- * .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
+ * .withCompression(Compression.GZIP));
* }</pre>
*
* <p>Any existing files with the same names as generated output files will be overwritten.
@@ -188,7 +187,7 @@ public class TextIO {
*/
public static Read read() {
return new AutoValue_TextIO_Read.Builder()
- .setCompressionType(CompressionType.AUTO)
+ .setCompression(Compression.AUTO)
.setHintMatchesManyFiles(false)
.setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
.build();
@@ -206,7 +205,7 @@ public class TextIO {
*/
public static ReadAll readAll() {
return new AutoValue_TextIO_ReadAll.Builder()
- .setCompressionType(CompressionType.AUTO)
+ .setCompression(Compression.AUTO)
// 64MB is a reasonable value that allows to amortize the cost of opening files,
// but is not so large as to exhaust a typical runner's maximum amount of output per
// ProcessElement call.
@@ -257,7 +256,7 @@ public class TextIO {
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
@Nullable abstract ValueProvider<String> getFilepattern();
- abstract CompressionType getCompressionType();
+ abstract Compression getCompression();
@Nullable
abstract Duration getWatchForNewFilesInterval();
@@ -273,7 +272,7 @@ public class TextIO {
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFilepattern(ValueProvider<String> filepattern);
- abstract Builder setCompressionType(CompressionType compressionType);
+ abstract Builder setCompression(Compression compression);
abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
abstract Builder setWatchForNewFilesTerminationCondition(
TerminationCondition<?, ?> condition);
@@ -307,13 +306,19 @@ public class TextIO {
return toBuilder().setFilepattern(filepattern).build();
}
+ /** @deprecated Use {@link #withCompression}. */
+ @Deprecated
+ public Read withCompressionType(TextIO.CompressionType compressionType) {
+ return withCompression(compressionType.canonical);
+ }
+
/**
* Reads from input sources using the specified compression type.
*
- * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
+ * <p>If no compression type is specified, the default is {@link Compression#AUTO}.
*/
- public Read withCompressionType(TextIO.CompressionType compressionType) {
- return toBuilder().setCompressionType(compressionType).build();
+ public Read withCompression(Compression compression) {
+ return toBuilder().setCompression(compression).build();
}
/**
@@ -364,7 +369,7 @@ public class TextIO {
// All other cases go through ReadAll.
ReadAll readAll =
readAll()
- .withCompressionType(getCompressionType())
+ .withCompression(getCompression())
.withEmptyMatchTreatment(getEmptyMatchTreatment());
if (getWatchForNewFilesInterval() != null) {
TerminationCondition<String, ?> readAllCondition =
@@ -378,37 +383,8 @@ public class TextIO {
// Helper to create a source specific to the requested compression type.
protected FileBasedSource<String> getSource() {
- return wrapWithCompression(
- new TextSource(getFilepattern(), getEmptyMatchTreatment()),
- getCompressionType());
- }
-
- private static FileBasedSource<String> wrapWithCompression(
- FileBasedSource<String> source, CompressionType compressionType) {
- switch (compressionType) {
- case UNCOMPRESSED:
- return source;
- case AUTO:
- return CompressedSource.from(source);
- case BZIP2:
- return
- CompressedSource.from(source)
- .withDecompression(CompressionMode.BZIP2);
- case GZIP:
- return
- CompressedSource.from(source)
- .withDecompression(CompressionMode.GZIP);
- case ZIP:
- return
- CompressedSource.from(source)
- .withDecompression(CompressionMode.ZIP);
- case DEFLATE:
- return
- CompressedSource.from(source)
- .withDecompression(CompressionMode.DEFLATE);
- default:
- throw new IllegalArgumentException("Unknown compression type: " + compressionType);
- }
+ return CompressedSource.from(new TextSource(getFilepattern(), getEmptyMatchTreatment()))
+ .withCompression(getCompression());
}
@Override
@@ -416,7 +392,7 @@ public class TextIO {
super.populateDisplayData(builder);
builder
.add(
- DisplayData.item("compressionType", getCompressionType().toString())
+ DisplayData.item("compressionType", getCompression().toString())
.withLabel("Compression Type"))
.addIfNotNull(
DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern"))
@@ -435,7 +411,7 @@ public class TextIO {
@AutoValue
public abstract static class ReadAll
extends PTransform<PCollection<String>, PCollection<String>> {
- abstract CompressionType getCompressionType();
+ abstract Compression getCompression();
@Nullable
abstract Duration getWatchForNewFilesInterval();
@@ -450,7 +426,7 @@ public class TextIO {
@AutoValue.Builder
abstract static class Builder {
- abstract Builder setCompressionType(CompressionType compressionType);
+ abstract Builder setCompression(Compression compression);
abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
abstract Builder setWatchForNewFilesTerminationCondition(
TerminationCondition<String, ?> condition);
@@ -460,9 +436,19 @@ public class TextIO {
abstract ReadAll build();
}
- /** Same as {@link Read#withCompressionType(CompressionType)}. */
- public ReadAll withCompressionType(CompressionType compressionType) {
- return toBuilder().setCompressionType(compressionType).build();
+ /** @deprecated Use {@link #withCompression}. */
+ @Deprecated
+ public ReadAll withCompressionType(TextIO.CompressionType compressionType) {
+ return withCompression(compressionType.canonical);
+ }
+
+ /**
+ * Reads from input sources using the specified compression type.
+ *
+ * <p>If no compression type is specified, the default is {@link Compression#AUTO}.
+ */
+ public ReadAll withCompression(Compression compression) {
+ return toBuilder().setCompression(compression).build();
}
/** Same as {@link Read#withEmptyMatchTreatment}. */
@@ -499,9 +485,9 @@ public class TextIO {
.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
- new IsSplittableFn(getCompressionType()),
+ new IsSplittableFn(getCompression()),
getDesiredBundleSizeBytes(),
- new CreateTextSourceFn(getCompressionType(), getEmptyMatchTreatment())))
+ new CreateTextSourceFn(getCompression(), getEmptyMatchTreatment())))
.setCoder(StringUtf8Coder.of());
}
@@ -510,39 +496,39 @@ public class TextIO {
super.populateDisplayData(builder);
builder.add(
- DisplayData.item("compressionType", getCompressionType().toString())
+ DisplayData.item("compressionType", getCompression().toString())
.withLabel("Compression Type"));
}
private static class CreateTextSourceFn
implements SerializableFunction<String, FileBasedSource<String>> {
- private final CompressionType compressionType;
+ private final Compression compression;
private final EmptyMatchTreatment emptyMatchTreatment;
private CreateTextSourceFn(
- CompressionType compressionType, EmptyMatchTreatment emptyMatchTreatment) {
- this.compressionType = compressionType;
+ Compression compression, EmptyMatchTreatment emptyMatchTreatment) {
+ this.compression = compression;
this.emptyMatchTreatment = emptyMatchTreatment;
}
@Override
public FileBasedSource<String> apply(String input) {
- return Read.wrapWithCompression(
- new TextSource(StaticValueProvider.of(input), emptyMatchTreatment), compressionType);
+ return CompressedSource.from(
+ new TextSource(StaticValueProvider.of(input), emptyMatchTreatment))
+ .withCompression(compression);
}
}
private static class IsSplittableFn implements SerializableFunction<String, Boolean> {
- private final CompressionType compressionType;
+ private final Compression compression;
- private IsSplittableFn(CompressionType compressionType) {
- this.compressionType = compressionType;
+ private IsSplittableFn(Compression compression) {
+ this.compression = compression;
}
@Override
public Boolean apply(String filename) {
- return compressionType == CompressionType.UNCOMPRESSED
- || (compressionType == CompressionType.AUTO && !CompressionMode.isCompressed(filename));
+ return !compression.isCompressed(filename);
}
}
}
@@ -811,7 +797,7 @@ public class TextIO {
/**
* Returns a transform for writing to text files like this one but that has the given {@link
* WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
- * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+ * default is value is {@link Compression#UNCOMPRESSED}.
*
* <p>A {@code null} value will reset the value to the default value mentioned above.
*/
@@ -821,6 +807,16 @@ public class TextIO {
}
/**
+ * Returns a transform for writing to text files like this one but that compresses output using
+ * the given {@link Compression}. The default value is {@link Compression#UNCOMPRESSED}.
+ */
+ public TypedWrite<UserT> withCompression(Compression compression) {
+ checkArgument(compression != null, "compression can not be null");
+ return withWritableByteChannelFactory(
+ FileBasedSink.CompressionType.fromCanonical(compression));
+ }
+
+ /**
* Preserves windowing of input elements and writes them to files based on the element's window.
*
* <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
@@ -1063,48 +1059,36 @@ public class TextIO {
}
}
- /**
- * Possible text file compression types.
- */
+ /** @deprecated Use {@link Compression}. */
+ @Deprecated
public enum CompressionType {
- /**
- * Automatically determine the compression type based on filename extension.
- */
- AUTO(""),
- /**
- * Uncompressed (i.e., may be split).
- */
- UNCOMPRESSED(""),
- /**
- * GZipped.
- */
- GZIP(".gz"),
- /**
- * BZipped.
- */
- BZIP2(".bz2"),
- /**
- * Zipped.
- */
- ZIP(".zip"),
- /**
- * Deflate compressed.
- */
- DEFLATE(".deflate");
+ /** @see Compression#AUTO */
+ AUTO(Compression.AUTO),
+
+ /** @see Compression#UNCOMPRESSED */
+ UNCOMPRESSED(Compression.UNCOMPRESSED),
+
+ /** @see Compression#GZIP */
+ GZIP(Compression.GZIP),
+
+ /** @see Compression#BZIP2 */
+ BZIP2(Compression.BZIP2),
- private String filenameSuffix;
+ /** @see Compression#ZIP */
+ ZIP(Compression.ZIP),
- CompressionType(String suffix) {
- this.filenameSuffix = suffix;
+ /** @see Compression#ZIP */
+ DEFLATE(Compression.DEFLATE);
+
+ private Compression canonical;
+
+ CompressionType(Compression canonical) {
+ this.canonical = canonical;
}
- /**
- * Determine if a given filename matches a compression type based on its extension.
- * @param filename the filename to match
- * @return true iff the filename ends with the compression type's known extension.
- */
+ /** @see Compression#matches */
public boolean matches(String filename) {
- return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
+ return canonical.matches(filename);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index fe6f01f..352d38a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -358,7 +358,7 @@ public class CompressedSourceTest {
}
@Test
- public void testUncompressedFileIsSplittable() throws Exception {
+ public void testUncompressedFileWithAutoIsSplittable() throws Exception {
String baseName = "test-input";
File uncompressedFile = tmpFolder.newFile(baseName + ".bin");
@@ -370,6 +370,21 @@ public class CompressedSourceTest {
SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
}
+
+ @Test
+ public void testUncompressedFileWithUncompressedIsSplittable() throws Exception {
+ String baseName = "test-input";
+
+ File uncompressedFile = tmpFolder.newFile(baseName + ".bin");
+ Files.write(generateInput(10), uncompressedFile);
+
+ CompressedSource<Byte> source =
+ CompressedSource.from(new ByteSource(uncompressedFile.getPath(), 1))
+ .withDecompression(CompressionMode.UNCOMPRESSED);
+ assertTrue(source.isSplittable());
+ SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
+ }
+
@Test
public void testGzipFileIsNotSplittable() throws Exception {
String baseName = "test-input";
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index ff30e33..fd8ad80 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -48,7 +48,6 @@ import java.util.zip.GZIPInputStream;
import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.Writer;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -229,7 +228,7 @@ public class FileBasedSinkTest {
String prefix = "file";
SimpleSink<Void> sink =
SimpleSink.makeSimpleSink(
- getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED);
+ getBaseOutputDirectory(), prefix, "", "", Compression.UNCOMPRESSED);
WriteOperation<Void, String> writeOp =
new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
@@ -320,7 +319,7 @@ public class FileBasedSinkTest {
SimpleSink<Void> sink =
SimpleSink.makeSimpleSink(
- root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED);
+ root, "file", ".SSSSS.of.NNNNN", ".test", Compression.UNCOMPRESSED);
FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
expected =
@@ -347,7 +346,7 @@ public class FileBasedSinkTest {
public void testCollidingOutputFilenames() throws IOException {
ResourceId root = getBaseOutputDirectory();
SimpleSink<Void> sink =
- SimpleSink.makeSimpleSink(root, "file", "-NN", "test", CompressionType.UNCOMPRESSED);
+ SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED);
SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink);
ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
@@ -376,7 +375,7 @@ public class FileBasedSinkTest {
ResourceId root = getBaseOutputDirectory();
SimpleSink<Void> sink =
SimpleSink.makeSimpleSink(
- root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED);
+ root, "file", "-SSSSS-of-NNNNN", "", Compression.UNCOMPRESSED);
FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
expected =
@@ -398,11 +397,11 @@ public class FileBasedSinkTest {
assertEquals(expected, actual);
}
- /** {@link CompressionType#BZIP2} correctly writes BZip2 data. */
+ /** {@link Compression#BZIP2} correctly writes BZip2 data. */
@Test
- public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException {
+ public void testCompressionBZIP2() throws FileNotFoundException, IOException {
final File file =
- writeValuesWithWritableByteChannelFactory(CompressionType.BZIP2, "abc", "123");
+ writeValuesWithCompression(Compression.BZIP2, "abc", "123");
// Read Bzip2ed data back in using Apache commons API (de facto standard).
assertReadValues(
new BufferedReader(
@@ -413,10 +412,10 @@ public class FileBasedSinkTest {
"123");
}
- /** {@link CompressionType#GZIP} correctly writes Gzipped data. */
+ /** {@link Compression#GZIP} correctly writes Gzipped data. */
@Test
- public void testCompressionTypeGZIP() throws FileNotFoundException, IOException {
- final File file = writeValuesWithWritableByteChannelFactory(CompressionType.GZIP, "abc", "123");
+ public void testCompressionGZIP() throws FileNotFoundException, IOException {
+ final File file = writeValuesWithCompression(Compression.GZIP, "abc", "123");
// Read Gzipped data back in using standard API.
assertReadValues(
new BufferedReader(
@@ -426,11 +425,11 @@ public class FileBasedSinkTest {
"123");
}
- /** {@link CompressionType#DEFLATE} correctly writes deflate data. */
+ /** {@link Compression#DEFLATE} correctly writes deflate data. */
@Test
- public void testCompressionTypeDEFLATE() throws FileNotFoundException, IOException {
+ public void testCompressionDEFLATE() throws FileNotFoundException, IOException {
final File file =
- writeValuesWithWritableByteChannelFactory(CompressionType.DEFLATE, "abc", "123");
+ writeValuesWithCompression(Compression.DEFLATE, "abc", "123");
// Read Gzipped data back in using standard API.
assertReadValues(
new BufferedReader(
@@ -441,11 +440,11 @@ public class FileBasedSinkTest {
"123");
}
- /** {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data. */
+ /** {@link Compression#UNCOMPRESSED} correctly writes uncompressed data. */
@Test
- public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException {
+ public void testCompressionUNCOMPRESSED() throws FileNotFoundException, IOException {
final File file =
- writeValuesWithWritableByteChannelFactory(CompressionType.UNCOMPRESSED, "abc", "123");
+ writeValuesWithCompression(Compression.UNCOMPRESSED, "abc", "123");
// Read uncompressed data back in using standard API.
assertReadValues(
new BufferedReader(
@@ -462,11 +461,11 @@ public class FileBasedSinkTest {
}
}
- private File writeValuesWithWritableByteChannelFactory(
- final WritableByteChannelFactory factory, String... values) throws IOException {
+ private File writeValuesWithCompression(
+ Compression compression, String... values) throws IOException {
final File file = tmpFolder.newFile("test.gz");
final WritableByteChannel channel =
- factory.create(Channels.newChannel(new FileOutputStream(file)));
+ compression.writeCompressed(Channels.newChannel(new FileOutputStream(file)));
for (String value : values) {
channel.write(ByteBuffer.wrap((value + "\n").getBytes(StandardCharsets.UTF_8)));
}
@@ -512,7 +511,7 @@ public class FileBasedSinkTest {
/** Build a SimpleSink with default options. */
private SimpleSink<Void> buildSink() {
return SimpleSink.makeSimpleSink(
- getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", CompressionType.UNCOMPRESSED);
+ getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", Compression.UNCOMPRESSED);
}
/** Build a SimpleWriteOperation with default options and the given temporary directory. */
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 382898d..b59876f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -36,12 +36,19 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, Strin
super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory);
}
+ public SimpleSink(
+ ResourceId tempDirectory,
+ DynamicDestinations<String, DestinationT, String> dynamicDestinations,
+ Compression compression) {
+ super(StaticValueProvider.of(tempDirectory), dynamicDestinations, compression);
+ }
+
public static SimpleSink<Void> makeSimpleSink(
ResourceId tempDirectory, FilenamePolicy filenamePolicy) {
return new SimpleSink<>(
tempDirectory,
DynamicFileDestinations.<String>constant(filenamePolicy),
- CompressionType.UNCOMPRESSED);
+ Compression.UNCOMPRESSED);
}
public static SimpleSink<Void> makeSimpleSink(
@@ -61,6 +68,20 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, Strin
return new SimpleSink<>(baseDirectory, dynamicDestinations, writableByteChannelFactory);
}
+ public static SimpleSink<Void> makeSimpleSink(
+ ResourceId baseDirectory,
+ String prefix,
+ String shardTemplate,
+ String suffix,
+ Compression compression) {
+ return makeSimpleSink(
+ baseDirectory,
+ prefix,
+ shardTemplate,
+ suffix,
+ FileBasedSink.CompressionType.fromCanonical(compression));
+ }
+
@Override
public SimpleWriteOperation<DestinationT> createWriteOperation() {
return new SimpleWriteOperation<>(this);
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
index d564d3b..6e5e4da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
@@ -17,11 +17,10 @@
*/
package org.apache.beam.sdk.io;
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType;
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.AUTO;
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.GZIP;
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.NONE;
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.ZLIB;
+import static org.apache.beam.sdk.io.Compression.AUTO;
+import static org.apache.beam.sdk.io.Compression.DEFLATE;
+import static org.apache.beam.sdk.io.Compression.GZIP;
+import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.isIn;
import static org.junit.Assert.assertEquals;
@@ -144,7 +143,7 @@ public class TFRecordIOTest {
public void testReadDisplayData() {
TFRecordIO.Read read = TFRecordIO.read()
.from("foo.*")
- .withCompressionType(GZIP)
+ .withCompression(GZIP)
.withoutValidation();
DisplayData displayData = DisplayData.from(read);
@@ -161,7 +160,7 @@ public class TFRecordIOTest {
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
.withNumShards(100)
- .withCompressionType(GZIP);
+ .withCompression(GZIP);
DisplayData displayData = DisplayData.from(write);
@@ -265,25 +264,25 @@ public class TFRecordIOTest {
@Test
@Category(NeedsRunner.class)
public void runTestRoundTrip() throws IOException {
- runTestRoundTrip(LARGE, 10, ".tfrecords", NONE, NONE);
+ runTestRoundTrip(LARGE, 10, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED);
}
@Test
@Category(NeedsRunner.class)
public void runTestRoundTripWithEmptyData() throws IOException {
- runTestRoundTrip(EMPTY, 10, ".tfrecords", NONE, NONE);
+ runTestRoundTrip(EMPTY, 10, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED);
}
@Test
@Category(NeedsRunner.class)
public void runTestRoundTripWithOneShards() throws IOException {
- runTestRoundTrip(LARGE, 1, ".tfrecords", NONE, NONE);
+ runTestRoundTrip(LARGE, 1, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED);
}
@Test
@Category(NeedsRunner.class)
public void runTestRoundTripWithSuffix() throws IOException {
- runTestRoundTrip(LARGE, 10, ".suffix", NONE, NONE);
+ runTestRoundTrip(LARGE, 10, ".suffix", UNCOMPRESSED, UNCOMPRESSED);
}
@Test
@@ -295,13 +294,13 @@ public class TFRecordIOTest {
@Test
@Category(NeedsRunner.class)
public void runTestRoundTripZlib() throws IOException {
- runTestRoundTrip(LARGE, 10, ".tfrecords", ZLIB, ZLIB);
+ runTestRoundTrip(LARGE, 10, ".tfrecords", DEFLATE, DEFLATE);
}
@Test
@Category(NeedsRunner.class)
public void runTestRoundTripUncompressedFilesWithAuto() throws IOException {
- runTestRoundTrip(LARGE, 10, ".tfrecords", NONE, AUTO);
+ runTestRoundTrip(LARGE, 10, ".tfrecords", UNCOMPRESSED, AUTO);
}
@Test
@@ -313,14 +312,14 @@ public class TFRecordIOTest {
@Test
@Category(NeedsRunner.class)
public void runTestRoundTripZlibFilesWithAuto() throws IOException {
- runTestRoundTrip(LARGE, 10, ".tfrecords", ZLIB, AUTO);
+ runTestRoundTrip(LARGE, 10, ".tfrecords", DEFLATE, AUTO);
}
private void runTestRoundTrip(Iterable<String> elems,
int numShards,
String suffix,
- CompressionType writeCompressionType,
- CompressionType readCompressionType) throws IOException {
+ Compression writeCompression,
+ Compression readCompression) throws IOException {
String outputName = "file";
Path baseDir = Files.createTempDirectory(tempFolder, "test-rt");
String baseFilename = baseDir.resolve(outputName).toString();
@@ -328,14 +327,14 @@ public class TFRecordIOTest {
TFRecordIO.Write write = TFRecordIO.write().to(baseFilename)
.withNumShards(numShards)
.withSuffix(suffix)
- .withCompressionType(writeCompressionType);
+ .withCompression(writeCompression);
p.apply(Create.of(elems).withCoder(StringUtf8Coder.of()))
.apply(ParDo.of(new StringToByteArray()))
.apply(write);
p.run();
TFRecordIO.Read read = TFRecordIO.read().from(baseFilename + "*")
- .withCompressionType(readCompressionType);
+ .withCompression(readCompression);
PCollection<String> output = p2.apply(read).apply(ParDo.of(new ByteArrayToString()));
PAssert.that(output).containsInAnyOrder(elems);