You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/31 00:47:40 UTC

[2/3] beam git commit: Adds a canonical Compression enum for file-based IOs

Adds a canonical Compression enum for file-based IOs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54489f0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54489f0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54489f0d

Branch: refs/heads/master
Commit: 54489f0d52e354d8233bf297cce6ce451a05f6a5
Parents: afe8b0e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Aug 18 16:17:20 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 17:40:52 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSink.java   |   2 +-
 .../apache/beam/sdk/io/CompressedSource.java    | 292 ++++++-------------
 .../org/apache/beam/sdk/io/Compression.java     | 228 +++++++++++++++
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 113 +++----
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 153 ++++------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 178 +++++------
 .../beam/sdk/io/CompressedSourceTest.java       |  17 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  41 ++-
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  23 +-
 .../org/apache/beam/sdk/io/TFRecordIOTest.java  |  35 ++-
 .../org/apache/beam/sdk/io/TextIOReadTest.java  |  81 +++--
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |   9 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |  96 +++---
 13 files changed, 672 insertions(+), 596 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index acd3ea6..888db85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -40,7 +40,7 @@ class AvroSink<UserT, DestinationT, OutputT> extends FileBasedSink<UserT, Destin
       DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations,
       boolean genericRecords) {
     // Avro handle compression internally using the codec.
-    super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED);
+    super(outputPrefix, dynamicDestinations, Compression.UNCOMPRESSED);
     this.dynamicDestinations = dynamicDestinations;
     this.genericRecords = genericRecords;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 6943a02..ae55d80 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -20,28 +20,17 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.Ints;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.PushbackInputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.NoSuchElementException;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
-import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.joda.time.Instant;
 
 /**
@@ -54,21 +43,20 @@ import org.joda.time.Instant;
  * FileBasedSource<T> mySource = ...;
  * PCollection<T> collection = p.apply(Read.from(CompressedSource
  *     .from(mySource)
- *     .withDecompression(CompressedSource.CompressionMode.GZIP)));
+ *     .withCompression(Compression.GZIP)));
  * } </pre>
  *
- * <p>Supported compression algorithms are {@link CompressionMode#GZIP},
- * {@link CompressionMode#BZIP2}, {@link CompressionMode#ZIP} and {@link CompressionMode#DEFLATE}.
- * User-defined compression types are supported by implementing
+ * <p>Supported compression algorithms are {@link Compression#GZIP},
+ * {@link Compression#BZIP2}, {@link Compression#ZIP} and {@link Compression#DEFLATE}.
+ * User-defined compression types are supported by implementing a
  * {@link DecompressingChannelFactory}.
  *
  * <p>By default, the compression algorithm is selected from those supported in
- * {@link CompressionMode} based on the file name provided to the source, namely
- * {@code ".bz2"} indicates {@link CompressionMode#BZIP2}, {@code ".gz"} indicates
- * {@link CompressionMode#GZIP}, {@code ".zip"} indicates {@link CompressionMode#ZIP} and
- * {@code ".deflate"} indicates {@link CompressionMode#DEFLATE}. If the file name does not match
- * any of the supported
- * algorithms, it is assumed to be uncompressed data.
+ * {@link Compression} based on the file name provided to the source, namely
+ * {@code ".bz2"} indicates {@link Compression#BZIP2}, {@code ".gz"} indicates
+ * {@link Compression#GZIP}, {@code ".zip"} indicates {@link Compression#ZIP} and
+ * {@code ".deflate"} indicates {@link Compression#DEFLATE}. If the file name does not match
+ * any of the supported algorithms, it is assumed to be uncompressed data.
  *
  * @param <T> The type to read from the compressed file.
  */
@@ -85,197 +73,75 @@ public class CompressedSource<T> extends FileBasedSource<T> {
         throws IOException;
   }
 
-  /**
-   * Factory interface for creating channels that decompress the content of an underlying channel,
-   * based on both the channel and the file name.
-   */
-  private interface FileNameBasedDecompressingChannelFactory
-      extends DecompressingChannelFactory {
-    /**
-     * Given a channel, create a channel that decompresses the content read from the channel.
-     */
-    ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel)
-        throws IOException;
-  }
-
-  /**
-   * Default compression types supported by the {@code CompressedSource}.
-   */
+  /** @deprecated Use {@link Compression} instead */
+  @Deprecated
   public enum CompressionMode implements DecompressingChannelFactory {
-    /**
-     * Reads a byte channel assuming it is compressed with gzip.
-     */
-    GZIP {
-      @Override
-      public boolean matches(String fileName) {
-          return fileName.toLowerCase().endsWith(".gz");
-      }
-
-      @Override
-      public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
-          throws IOException {
-        // Determine if the input stream is gzipped. The input stream returned from the
-        // GCS connector may already be decompressed; GCS does this based on the
-        // content-encoding property.
-        PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2);
-        byte[] headerBytes = new byte[2];
-        int bytesRead = ByteStreams.read(
-            stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */);
-        stream.unread(headerBytes, 0, bytesRead);
-        if (bytesRead >= 2) {
-          byte zero = 0x00;
-          int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
-          if (header == GZIPInputStream.GZIP_MAGIC) {
-            return Channels.newChannel(new GzipCompressorInputStream(stream, true));
-          }
-        }
-        return Channels.newChannel(stream);
-      }
-    },
-
-    /**
-     * Reads a byte channel assuming it is compressed with bzip2.
-     */
-    BZIP2 {
-      @Override
-      public boolean matches(String fileName) {
-          return fileName.toLowerCase().endsWith(".bz2");
-      }
+    /** @see Compression#UNCOMPRESSED */
+    UNCOMPRESSED(Compression.UNCOMPRESSED),
 
-      @Override
-      public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
-          throws IOException {
-        return Channels.newChannel(
-            new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
-      }
-    },
+    /** @see Compression#AUTO */
+    AUTO(Compression.AUTO),
 
-    /**
-     * Reads a byte channel assuming it is compressed with zip.
-     * If the zip file contains multiple entries, files in the zip are concatenated all together.
-     */
-    ZIP {
-      @Override
-      public boolean matches(String fileName) {
-        return fileName.toLowerCase().endsWith(".zip");
-      }
+    /** @see Compression#GZIP */
+    GZIP(Compression.GZIP),
 
-      public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
-        throws IOException {
-        FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel));
-        return Channels.newChannel(zip);
-      }
-    },
+    /** @see Compression#BZIP2 */
+    BZIP2(Compression.BZIP2),
 
-    /**
-     * Reads a byte channel assuming it is compressed with deflate.
-     */
-    DEFLATE {
-      @Override
-      public boolean matches(String fileName) {
-        return fileName.toLowerCase().endsWith(".deflate");
-      }
-
-      public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
-          throws IOException {
-        return Channels.newChannel(
-            new DeflateCompressorInputStream(Channels.newInputStream(channel)));
-      }
-    };
+    /** @see Compression#ZIP */
+    ZIP(Compression.ZIP),
 
-    /**
-     * Extend of {@link ZipInputStream} to automatically read all entries in the zip.
-     */
-    private static class FullZipInputStream extends InputStream {
+    /** @see Compression#DEFLATE */
+    DEFLATE(Compression.DEFLATE);
 
-      private ZipInputStream zipInputStream;
-      private ZipEntry currentEntry;
-
-      public FullZipInputStream(InputStream is) throws IOException {
-        super();
-        zipInputStream = new ZipInputStream(is);
-        currentEntry = zipInputStream.getNextEntry();
-      }
-
-      @Override
-      public int read() throws IOException {
-        int result = zipInputStream.read();
-        while (result == -1) {
-          currentEntry = zipInputStream.getNextEntry();
-          if (currentEntry == null) {
-            return -1;
-          } else {
-            result = zipInputStream.read();
-          }
-        }
-        return result;
-      }
-
-      @Override
-      public int read(byte[] b, int off, int len) throws IOException {
-        int result = zipInputStream.read(b, off, len);
-        while (result == -1) {
-          currentEntry = zipInputStream.getNextEntry();
-          if (currentEntry == null) {
-            return -1;
-          } else {
-            result = zipInputStream.read(b, off, len);
-          }
-        }
-        return result;
-      }
+    private Compression canonical;
 
+    CompressionMode(Compression canonical) {
+      this.canonical = canonical;
     }
 
     /**
      * Returns {@code true} if the given file name implies that the contents are compressed
      * according to the compression embodied by this factory.
      */
-    public abstract boolean matches(String fileName);
+    public boolean matches(String fileName) {
+      return canonical.matches(fileName);
+    }
 
     @Override
-    public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
-        throws IOException;
+    public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
+        throws IOException {
+      return canonical.readDecompressed(channel);
+    }
 
     /** Returns whether the file's extension matches of one of the known compression formats. */
     public static boolean isCompressed(String filename) {
-      for (CompressionMode type : CompressionMode.values()) {
-        if  (type.matches(filename)) {
-          return true;
-        }
-      }
-      return false;
+      return Compression.AUTO.isCompressed(filename);
     }
-  }
 
-  /**
-   * Reads a byte channel detecting compression according to the file name. If the filename
-   * is not any other known {@link CompressionMode}, it is presumed to be uncompressed.
-   */
-  private static class DecompressAccordingToFilename
-      implements FileNameBasedDecompressingChannelFactory {
+    static DecompressingChannelFactory fromCanonical(Compression compression) {
+      switch (compression) {
+        case AUTO:
+          return AUTO;
 
-    @Override
-    public ReadableByteChannel createDecompressingChannel(
-        String fileName, ReadableByteChannel channel) throws IOException {
-      for (CompressionMode type : CompressionMode.values()) {
-        if (type.matches(fileName)) {
-          return type.createDecompressingChannel(channel);
-        }
-      }
-      // Uncompressed
-      return channel;
-    }
+        case UNCOMPRESSED:
+          return UNCOMPRESSED;
 
-    @Override
-    public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) {
-      throw new UnsupportedOperationException(
-          String.format("%s does not support createDecompressingChannel(%s) but only"
-              + " createDecompressingChannel(%s,%s)",
-              getClass().getSimpleName(),
-              String.class.getSimpleName(),
-              ReadableByteChannel.class.getSimpleName(),
-              ReadableByteChannel.class.getSimpleName()));
+        case GZIP:
+          return GZIP;
+
+        case BZIP2:
+          return BZIP2;
+
+        case ZIP:
+          return ZIP;
+
+        case DEFLATE:
+          return DEFLATE;
+
+        default:
+          throw new IllegalArgumentException("Unsupported compression type: " + compression);
+      }
     }
   }
 
@@ -288,7 +154,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
    * configured via {@link CompressedSource#withDecompression}.
    */
   public static <T> CompressedSource<T> from(FileBasedSource<T> sourceDelegate) {
-    return new CompressedSource<>(sourceDelegate, new DecompressAccordingToFilename());
+    return new CompressedSource<>(sourceDelegate, CompressionMode.AUTO);
   }
 
   /**
@@ -299,6 +165,11 @@ public class CompressedSource<T> extends FileBasedSource<T> {
     return new CompressedSource<>(this.sourceDelegate, channelFactory);
   }
 
+  /** Like {@link #withDecompression} but takes a canonical {@link Compression}. */
+  public CompressedSource<T> withCompression(Compression compression) {
+    return withDecompression(CompressionMode.fromCanonical(compression));
+  }
+
   /**
    * Creates a {@code CompressedSource} from a delegate file based source and a decompressing
    * channel factory.
@@ -359,10 +230,21 @@ public class CompressedSource<T> extends FileBasedSource<T> {
    * from the requested file name that the file is not compressed.
    */
   @Override
-  protected final boolean isSplittable() throws Exception {
-    return channelFactory instanceof FileNameBasedDecompressingChannelFactory
-        && !CompressionMode.isCompressed(getFileOrPatternSpec())
-        && sourceDelegate.isSplittable();
+  protected final boolean isSplittable() {
+    try {
+      if (!sourceDelegate.isSplittable()) {
+        return false;
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    if (channelFactory == CompressionMode.UNCOMPRESSED) {
+      return true;
+    }
+    if (channelFactory == CompressionMode.AUTO) {
+      return !Compression.AUTO.isCompressed(getFileOrPatternSpec());
+    }
+    return false;
   }
 
   /**
@@ -375,10 +257,8 @@ public class CompressedSource<T> extends FileBasedSource<T> {
    */
   @Override
   protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
-    if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
-      if (!CompressionMode.isCompressed(getFileOrPatternSpec())) {
-        return sourceDelegate.createSingleFileReader(options);
-      }
+    if (isSplittable()) {
+      return sourceDelegate.createSingleFileReader(options);
     }
     return new CompressedReader<T>(
         this, sourceDelegate.createSingleFileReader(options));
@@ -423,19 +303,19 @@ public class CompressedSource<T> extends FileBasedSource<T> {
   public static class CompressedReader<T> extends FileBasedReader<T> {
 
     private final FileBasedReader<T> readerDelegate;
-    private final CompressedSource<T> source;
     private final Object progressLock = new Object();
     @GuardedBy("progressLock")
     private int numRecordsRead;
     @GuardedBy("progressLock")
     private CountingChannel channel;
+    private DecompressingChannelFactory channelFactory;
 
     /**
      * Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader.
      */
     public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) {
       super(source);
-      this.source = source;
+      this.channelFactory = source.getChannelFactory();
       this.readerDelegate = readerDelegate;
     }
 
@@ -525,14 +405,12 @@ public class CompressedSource<T> extends FileBasedSource<T> {
         channel = this.channel;
       }
 
-      if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) {
-        FileNameBasedDecompressingChannelFactory channelFactory =
-            (FileNameBasedDecompressingChannelFactory) source.getChannelFactory();
-        readerDelegate.startReading(channelFactory.createDecompressingChannel(
-            getCurrentSource().getFileOrPatternSpec(),
-            channel));
+      if (channelFactory == CompressionMode.AUTO) {
+        readerDelegate.startReading(
+            Compression.detect(getCurrentSource().getFileOrPatternSpec())
+                .readDecompressed(channel));
       } else {
-        readerDelegate.startReading(source.getChannelFactory().createDecompressingChannel(
+        readerDelegate.startReading(channelFactory.createDecompressingChannel(
             channel));
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java
new file mode 100644
index 0000000..bb40ed4
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+
+/** Various compression types for reading/writing files. */
+public enum Compression {
+  /**
+   * When reading a file, automatically determine the compression type based on filename extension.
+   * Not applicable when writing files.
+   */
+  AUTO("") {
+    @Override
+    public ReadableByteChannel readDecompressed(ReadableByteChannel channel) {
+      throw new UnsupportedOperationException(
+          "Must resolve compression into a concrete value before calling readDecompressed()");
+    }
+
+    @Override
+    public WritableByteChannel writeCompressed(WritableByteChannel channel) {
+      throw new UnsupportedOperationException("AUTO is applicable only to reading files");
+    }
+  },
+
+  /** No compression. */
+  UNCOMPRESSED("") {
+    @Override
+    public ReadableByteChannel readDecompressed(ReadableByteChannel channel) {
+      return channel;
+    }
+
+    @Override
+    public WritableByteChannel writeCompressed(WritableByteChannel channel) {
+      return channel;
+    }
+  },
+
+  /** GZip compression. */
+  GZIP(".gz", ".gz") {
+    @Override
+    public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
+      // Determine if the input stream is gzipped. The input stream returned from the
+      // GCS connector may already be decompressed; GCS does this based on the
+      // content-encoding property.
+      PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2);
+      byte[] headerBytes = new byte[2];
+      int bytesRead =
+          ByteStreams.read(
+              stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */);
+      stream.unread(headerBytes, 0, bytesRead);
+      if (bytesRead >= 2) {
+        byte zero = 0x00;
+        int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
+        if (header == GZIPInputStream.GZIP_MAGIC) {
+          return Channels.newChannel(new GzipCompressorInputStream(stream, true));
+        }
+      }
+      return Channels.newChannel(stream);
+    }
+
+    @Override
+    public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
+      return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true));
+    }
+  },
+
+  /** BZip compression. */
+  BZIP2(".bz2", ".bz2") {
+    @Override
+    public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
+      return Channels.newChannel(
+          new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
+    }
+
+    @Override
+    public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
+      return Channels.newChannel(
+          new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
+    }
+  },
+
+  /** Zip compression. */
+  ZIP(".zip", ".zip") {
+    @Override
+    public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
+      FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel));
+      return Channels.newChannel(zip);
+    }
+
+    @Override
+    public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
+      throw new UnsupportedOperationException("Writing ZIP files is currently unsupported");
+    }
+  },
+
+  /** Deflate compression. */
+  DEFLATE(".deflate", ".deflate", ".zlib") {
+    @Override
+    public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
+      return Channels.newChannel(
+          new DeflateCompressorInputStream(Channels.newInputStream(channel)));
+    }
+
+    @Override
+    public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
+      return Channels.newChannel(
+          new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
+    }
+  };
+
+  private final String suggestedSuffix;
+  private final List<String> detectedSuffixes;
+
+  Compression(String suggestedSuffix, String... detectedSuffixes) {
+    this.suggestedSuffix = suggestedSuffix;
+    this.detectedSuffixes = Arrays.asList(detectedSuffixes);
+  }
+
+  public String getSuggestedSuffix() {
+    return suggestedSuffix;
+  }
+
+  public boolean matches(String filename) {
+    for (String suffix : detectedSuffixes) {
+      if (filename.toLowerCase().endsWith(suffix)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public boolean isCompressed(String filename) {
+    Compression compression = this;
+    if (compression == AUTO) {
+      compression = detect(filename);
+    }
+    return compression != UNCOMPRESSED;
+  }
+
+  public static Compression detect(String filename) {
+    for (Compression value : values()) {
+      if (value.matches(filename)) {
+        return value;
+      }
+    }
+    return UNCOMPRESSED;
+  }
+
+  public abstract ReadableByteChannel readDecompressed(ReadableByteChannel channel)
+      throws IOException;
+
+  public abstract WritableByteChannel writeCompressed(WritableByteChannel channel)
+      throws IOException;
+
+  /** Concatenates all {@link ZipInputStream}s contained within the zip file. */
+  private static class FullZipInputStream extends InputStream {
+    private ZipInputStream zipInputStream;
+    private ZipEntry currentEntry;
+
+    public FullZipInputStream(InputStream is) throws IOException {
+      super();
+      zipInputStream = new ZipInputStream(is);
+      currentEntry = zipInputStream.getNextEntry();
+    }
+
+    @Override
+    public int read() throws IOException {
+      int result = zipInputStream.read();
+      while (result == -1) {
+        currentEntry = zipInputStream.getNextEntry();
+        if (currentEntry == null) {
+          return -1;
+        } else {
+          result = zipInputStream.read();
+        }
+      }
+      return result;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      int result = zipInputStream.read(b, off, len);
+      while (result == -1) {
+        currentEntry = zipInputStream.getNextEntry();
+        if (currentEntry == null) {
+          return -1;
+        } else {
+          result = zipInputStream.read(b, off, len);
+        }
+      }
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index d618647..39f7868 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -36,7 +36,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -47,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -79,8 +77,6 @@ import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
-import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Instant;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -128,56 +124,66 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
 
-  /** Directly supported file output compression types. */
+  /** @deprecated use {@link Compression}. */
+  @Deprecated
   public enum CompressionType implements WritableByteChannelFactory {
-    /** No compression, or any other transformation, will be used. */
-    UNCOMPRESSED("", null) {
-      @Override
-      public WritableByteChannel create(WritableByteChannel channel) throws IOException {
-        return channel;
-      }
-    },
-    /** Provides GZip output transformation. */
-    GZIP(".gz", MimeTypes.BINARY) {
-      @Override
-      public WritableByteChannel create(WritableByteChannel channel) throws IOException {
-        return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true));
-      }
-    },
-    /** Provides BZip2 output transformation. */
-    BZIP2(".bz2", MimeTypes.BINARY) {
-      @Override
-      public WritableByteChannel create(WritableByteChannel channel) throws IOException {
-        return Channels.newChannel(
-            new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
-      }
-    },
-    /** Provides deflate output transformation. */
-    DEFLATE(".deflate", MimeTypes.BINARY) {
-      @Override
-      public WritableByteChannel create(WritableByteChannel channel) throws IOException {
-        return Channels.newChannel(
-            new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
-      }
-    };
+    /** @see Compression#UNCOMPRESSED */
+    UNCOMPRESSED(Compression.UNCOMPRESSED),
 
-    private String filenameSuffix;
-    @Nullable private String mimeType;
+    /** @see Compression#GZIP */
+    GZIP(Compression.GZIP),
 
-    CompressionType(String suffix, @Nullable String mimeType) {
-      this.filenameSuffix = suffix;
-      this.mimeType = mimeType;
+    /** @see Compression#BZIP2 */
+    BZIP2(Compression.BZIP2),
+
+    /** @see Compression#DEFLATE */
+    DEFLATE(Compression.DEFLATE);
+
+    private Compression canonical;
+
+    CompressionType(Compression canonical) {
+      this.canonical = canonical;
     }
 
     @Override
     public String getSuggestedFilenameSuffix() {
-      return filenameSuffix;
+      return canonical.getSuggestedSuffix();
     }
 
     @Override
     @Nullable
     public String getMimeType() {
-      return mimeType;
+      return (canonical == Compression.UNCOMPRESSED) ? null : MimeTypes.BINARY;
+    }
+
+    @Override
+    public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+      return canonical.writeCompressed(channel);
+    }
+
+    public static CompressionType fromCanonical(Compression canonical) {
+      switch(canonical) {
+        case AUTO:
+          throw new IllegalArgumentException("AUTO is not supported for writing");
+
+        case UNCOMPRESSED:
+          return UNCOMPRESSED;
+
+        case GZIP:
+          return GZIP;
+
+        case BZIP2:
+          return BZIP2;
+
+        case ZIP:
+          throw new IllegalArgumentException("ZIP is unsupported");
+
+        case DEFLATE:
+          return DEFLATE;
+
+        default:
+          throw new UnsupportedOperationException("Unsupported compression type: " + canonical);
+      }
     }
   }
 
@@ -208,7 +214,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
   /**
    * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
    * underlying channel. The default is to not compress the output using {@link
-   * CompressionType#UNCOMPRESSED}.
+   * Compression#UNCOMPRESSED}.
    */
   private final WritableByteChannelFactory writableByteChannelFactory;
 
@@ -328,7 +334,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      * When a sink has requested windowed or triggered output, this method will be invoked to return
      * the file {@link ResourceId resource} to be created given the base output directory and a
      * {@link OutputFileHints} containing information about the file, including a suggested
-     * extension (e.g. coming from {@link CompressionType}).
+     * extension (e.g. coming from {@link Compression}).
      *
      * <p>The policy must return unique and consistent filenames for different windows and panes.
      */
@@ -344,7 +350,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      * When a sink has not requested windowed or triggered output, this method will be invoked to
      * return the file {@link ResourceId resource} to be created given the base output directory and
      * a {@link OutputFileHints} containing information about the file, including a suggested (e.g.
-     * coming from {@link CompressionType}).
+     * coming from {@link Compression}).
      *
      * <p>The shardNumber and numShards parameters, should be used by the policy to generate unique
      * and consistent filenames.
@@ -375,7 +381,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
   public FileBasedSink(
       ValueProvider<ResourceId> tempDirectoryProvider,
       DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations) {
-    this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED);
+    this(tempDirectoryProvider, dynamicDestinations, Compression.UNCOMPRESSED);
   }
 
   /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
@@ -390,6 +396,15 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     this.writableByteChannelFactory = writableByteChannelFactory;
   }
 
+  /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
+  @Experimental(Kind.FILESYSTEM)
+  public FileBasedSink(
+      ValueProvider<ResourceId> tempDirectoryProvider,
+      DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations,
+      Compression compression) {
+    this(tempDirectoryProvider, dynamicDestinations, CompressionType.fromCanonical(compression));
+  }
+
   /** Return the {@link DynamicDestinations} used. */
   @SuppressWarnings("unchecked")
   public DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
@@ -799,7 +814,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      *
      * <p>This is the default for the sink, but it may be overridden by a supplied {@link
      * WritableByteChannelFactory}. For example, {@link TextIO.Write} uses {@link MimeTypes#TEXT} by
-     * default but if {@link CompressionType#BZIP2} is set then the MIME type will be overridden to
+     * default but if {@link Compression#BZIP2} is set then the MIME type will be overridden to
      * {@link MimeTypes#BINARY}.
      */
     private final String mimeType;
@@ -1134,7 +1149,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     /**
      * Returns the MIME type that should be used for the files that will hold the output data. May
      * return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change
-     * the MIME type (e.g., for {@link CompressionType#UNCOMPRESSED}).
+     * the MIME type (e.g., for {@link Compression#UNCOMPRESSED}).
      *
      * @see MimeTypes
      * @see <a href=
@@ -1144,7 +1159,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     String getMimeType();
 
     /**
-     * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP}
+     * @return an optional filename suffix, eg, ".gz" is returned for {@link Compression#GZIP}
      */
     @Nullable
     String getSuggestedFilenameSuffix();

http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 526c50e..ddedd00 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -42,7 +42,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -64,7 +63,7 @@ public class TFRecordIO {
   public static Read read() {
     return new AutoValue_TFRecordIO_Read.Builder()
         .setValidate(true)
-        .setCompressionType(CompressionType.AUTO)
+        .setCompression(Compression.AUTO)
         .build();
   }
 
@@ -78,7 +77,7 @@ public class TFRecordIO {
         .setShardTemplate(null)
         .setFilenameSuffix(null)
         .setNumShards(0)
-        .setCompressionType(CompressionType.NONE)
+        .setCompression(Compression.UNCOMPRESSED)
         .build();
   }
 
@@ -90,7 +89,7 @@ public class TFRecordIO {
 
     abstract boolean getValidate();
 
-    abstract CompressionType getCompressionType();
+    abstract Compression getCompression();
 
     abstract Builder toBuilder();
 
@@ -98,7 +97,7 @@ public class TFRecordIO {
     abstract static class Builder {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
       abstract Builder setValidate(boolean validate);
-      abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setCompression(Compression compression);
 
       abstract Read build();
     }
@@ -134,18 +133,22 @@ public class TFRecordIO {
       return toBuilder().setValidate(false).build();
     }
 
+    /** @deprecated Use {@link #withCompression}. */
+    @Deprecated
+    public Read withCompressionType(TFRecordIO.CompressionType compressionType) {
+      return withCompression(compressionType.canonical);
+    }
+
     /**
-     * Returns a transform for reading TFRecord files that decompresses all input files
-     * using the specified compression type.
+     * Returns a transform for reading TFRecord files that decompresses all input files using the
+     * specified compression type.
      *
-     * <p>If no compression type is specified, the default is
-     * {@link TFRecordIO.CompressionType#AUTO}.
-     * In this mode, the compression type of the file is determined by its extension
-     * (e.g., {@code *.gz} is gzipped, {@code *.zlib} is zlib compressed, and all other
-     * extensions are uncompressed).
+     * <p>If no compression type is specified, the default is {@link Compression#AUTO}. In this
+     * mode, the compression type of the file is determined by its extension via {@link
+     * Compression#detect(String)}.
      */
-    public Read withCompressionType(TFRecordIO.CompressionType compressionType) {
-      return toBuilder().setCompressionType(compressionType).build();
+    public Read withCompression(Compression compression) {
+      return toBuilder().setCompression(compression).build();
     }
 
     @Override
@@ -174,29 +177,15 @@ public class TFRecordIO {
 
     // Helper to create a source specific to the requested compression type.
     protected FileBasedSource<byte[]> getSource() {
-      switch (getCompressionType()) {
-        case NONE:
-          return new TFRecordSource(getFilepattern());
-        case AUTO:
-          return CompressedSource.from(new TFRecordSource(getFilepattern()));
-        case GZIP:
-          return
-              CompressedSource.from(new TFRecordSource(getFilepattern()))
-                  .withDecompression(CompressedSource.CompressionMode.GZIP);
-        case ZLIB:
-          return
-              CompressedSource.from(new TFRecordSource(getFilepattern()))
-                  .withDecompression(CompressedSource.CompressionMode.DEFLATE);
-        default:
-          throw new IllegalArgumentException("Unknown compression type: " + getCompressionType());
-      }
+      return CompressedSource.from(new TFRecordSource(getFilepattern()))
+          .withCompression(getCompression());
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
-          .add(DisplayData.item("compressionType", getCompressionType().toString())
+          .add(DisplayData.item("compressionType", getCompression().toString())
               .withLabel("Compression Type"))
           .addIfNotDefault(DisplayData.item("validation", getValidate())
               .withLabel("Validation Enabled"), true)
@@ -223,7 +212,7 @@ public class TFRecordIO {
     @Nullable abstract String getShardTemplate();
 
     /** Option to indicate the output sink's compression type. Default is NONE. */
-    abstract CompressionType getCompressionType();
+    abstract Compression getCompression();
 
     abstract Builder toBuilder();
 
@@ -237,7 +226,7 @@ public class TFRecordIO {
 
       abstract Builder setNumShards(int numShards);
 
-      abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setCompression(Compression compression);
 
       abstract Write build();
     }
@@ -326,15 +315,20 @@ public class TFRecordIO {
       return withNumShards(1).withShardNameTemplate("");
     }
 
+    /** @deprecated use {@link #withCompression}. */
+    @Deprecated
+    public Write withCompressionType(CompressionType compressionType) {
+      return withCompression(compressionType.canonical);
+    }
+
     /**
      * Writes to output files using the specified compression type.
      *
-     * <p>If no compression type is specified, the default is
-     * {@link TFRecordIO.CompressionType#NONE}.
-     * See {@link TFRecordIO.Read#withCompressionType} for more details.
+     * <p>If no compression type is specified, the default is {@link Compression#UNCOMPRESSED}. See
+     * {@link TFRecordIO.Read#withCompression} for more details.
      */
-    public Write withCompressionType(CompressionType compressionType) {
-      return toBuilder().setCompressionType(compressionType).build();
+    public Write withCompression(Compression compression) {
+      return toBuilder().setCompression(compression).build();
     }
 
     @Override
@@ -347,7 +341,7 @@ public class TFRecordIO {
                   getOutputPrefix(),
                   getShardTemplate(),
                   getFilenameSuffix(),
-                  getCompressionType()));
+                  getCompression()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -366,45 +360,35 @@ public class TFRecordIO {
                   .withLabel("Output Shard Name Template"))
           .addIfNotDefault(DisplayData.item("numShards", getNumShards())
               .withLabel("Maximum Output Shards"), 0)
-          .add(DisplayData.item("compressionType", getCompressionType().toString())
+          .add(DisplayData.item("compressionType", getCompression().toString())
               .withLabel("Compression Type"));
     }
   }
 
-  /**
-   * Possible TFRecord file compression types.
-   */
+  /** @deprecated Use {@link Compression}. */
+  @Deprecated
   public enum CompressionType {
-    /**
-     * Automatically determine the compression type based on filename extension.
-     */
-    AUTO(""),
-    /**
-     * Uncompressed.
-     */
-    NONE(""),
-    /**
-     * GZipped.
-     */
-    GZIP(".gz"),
-    /**
-     * ZLIB compressed.
-     */
-    ZLIB(".zlib");
+    /** @see Compression#AUTO */
+    AUTO(Compression.AUTO),
+
+    /** @see Compression#UNCOMPRESSED */
+    NONE(Compression.UNCOMPRESSED),
+
+    /** @see Compression#GZIP */
+    GZIP(Compression.GZIP),
+
+    /** @see Compression#DEFLATE */
+    ZLIB(Compression.DEFLATE);
 
-    private String filenameSuffix;
+    private Compression canonical;
 
-    CompressionType(String suffix) {
-      this.filenameSuffix = suffix;
+    CompressionType(Compression canonical) {
+      this.canonical = canonical;
     }
 
-    /**
-     * Determine if a given filename matches a compression type based on its extension.
-     * @param filename the filename to match
-     * @return true iff the filename ends with the compression type's known extension.
-     */
+    /** @see Compression#matches */
     public boolean matches(String filename) {
-      return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
+      return canonical.matches(filename);
     }
   }
 
@@ -419,11 +403,6 @@ public class TFRecordIO {
   @VisibleForTesting
   static class TFRecordSource extends FileBasedSource<byte[]> {
     @VisibleForTesting
-    TFRecordSource(String fileSpec) {
-      super(StaticValueProvider.of(fileSpec), 1L);
-    }
-
-    @VisibleForTesting
     TFRecordSource(ValueProvider<String> fileSpec) {
       super(fileSpec, Long.MAX_VALUE);
     }
@@ -452,7 +431,7 @@ public class TFRecordIO {
     }
 
     @Override
-    protected boolean isSplittable() throws Exception {
+    protected boolean isSplittable() {
       // TFRecord files are not splittable
       return false;
     }
@@ -528,20 +507,13 @@ public class TFRecordIO {
         ValueProvider<ResourceId> outputPrefix,
         @Nullable String shardTemplate,
         @Nullable String suffix,
-        TFRecordIO.CompressionType compressionType) {
+        Compression compression) {
       super(
           outputPrefix,
           DynamicFileDestinations.<byte[]>constant(
               DefaultFilenamePolicy.fromStandardParameters(
                   outputPrefix, shardTemplate, suffix, false)),
-          writableByteChannelFactory(compressionType));
-    }
-
-    private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
-      @Override
-      public ResourceId apply(ResourceId input) {
-        return input.getCurrentDirectory();
-      }
+          compression);
     }
 
     @Override
@@ -549,21 +521,6 @@ public class TFRecordIO {
       return new TFRecordWriteOperation(this);
     }
 
-    private static WritableByteChannelFactory writableByteChannelFactory(
-        TFRecordIO.CompressionType compressionType) {
-      switch (compressionType) {
-        case AUTO:
-          throw new IllegalArgumentException("Unsupported compression type AUTO");
-        case NONE:
-          return CompressionType.UNCOMPRESSED;
-        case GZIP:
-          return CompressionType.GZIP;
-        case ZLIB:
-          return CompressionType.DEFLATE;
-      }
-      return CompressionType.UNCOMPRESSED;
-    }
-
     /** A {@link WriteOperation WriteOperation} for TFRecord files. */
     private static class TFRecordWriteOperation extends WriteOperation<Void, byte[]> {
       private TFRecordWriteOperation(TFRecordSink sink) {

http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 442e4d9..76102cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -32,7 +32,6 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
@@ -135,7 +134,7 @@ import org.joda.time.Duration;
  * PCollection<String> lines = ...;
  * lines.apply(TextIO.write().to("/path/to/file.txt"))
  *      .withSuffix(".txt")
- *      .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
+ *      .withCompression(Compression.GZIP));
  * }</pre>
  *
  * <p>Any existing files with the same names as generated output files will be overwritten.
@@ -188,7 +187,7 @@ public class TextIO {
    */
   public static Read read() {
     return new AutoValue_TextIO_Read.Builder()
-        .setCompressionType(CompressionType.AUTO)
+        .setCompression(Compression.AUTO)
         .setHintMatchesManyFiles(false)
         .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
         .build();
@@ -206,7 +205,7 @@ public class TextIO {
    */
   public static ReadAll readAll() {
     return new AutoValue_TextIO_ReadAll.Builder()
-        .setCompressionType(CompressionType.AUTO)
+        .setCompression(Compression.AUTO)
         // 64MB is a reasonable value that allows to amortize the cost of opening files,
         // but is not so large as to exhaust a typical runner's maximum amount of output per
         // ProcessElement call.
@@ -257,7 +256,7 @@ public class TextIO {
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
-    abstract CompressionType getCompressionType();
+    abstract Compression getCompression();
 
     @Nullable
     abstract Duration getWatchForNewFilesInterval();
@@ -273,7 +272,7 @@ public class TextIO {
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
-      abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setCompression(Compression compression);
       abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
       abstract Builder setWatchForNewFilesTerminationCondition(
               TerminationCondition<?, ?> condition);
@@ -307,13 +306,19 @@ public class TextIO {
       return toBuilder().setFilepattern(filepattern).build();
     }
 
+    /** @deprecated Use {@link #withCompression}. */
+    @Deprecated
+    public Read withCompressionType(TextIO.CompressionType compressionType) {
+      return withCompression(compressionType.canonical);
+    }
+
     /**
      * Reads from input sources using the specified compression type.
      *
-     * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
+     * <p>If no compression type is specified, the default is {@link Compression#AUTO}.
      */
-    public Read withCompressionType(TextIO.CompressionType compressionType) {
-      return toBuilder().setCompressionType(compressionType).build();
+    public Read withCompression(Compression compression) {
+      return toBuilder().setCompression(compression).build();
     }
 
     /**
@@ -364,7 +369,7 @@ public class TextIO {
       // All other cases go through ReadAll.
       ReadAll readAll =
           readAll()
-              .withCompressionType(getCompressionType())
+              .withCompression(getCompression())
               .withEmptyMatchTreatment(getEmptyMatchTreatment());
       if (getWatchForNewFilesInterval() != null) {
         TerminationCondition<String, ?> readAllCondition =
@@ -378,37 +383,8 @@ public class TextIO {
 
     // Helper to create a source specific to the requested compression type.
     protected FileBasedSource<String> getSource() {
-      return wrapWithCompression(
-          new TextSource(getFilepattern(), getEmptyMatchTreatment()),
-          getCompressionType());
-    }
-
-    private static FileBasedSource<String> wrapWithCompression(
-        FileBasedSource<String> source, CompressionType compressionType) {
-      switch (compressionType) {
-        case UNCOMPRESSED:
-          return source;
-        case AUTO:
-          return CompressedSource.from(source);
-        case BZIP2:
-          return
-              CompressedSource.from(source)
-                  .withDecompression(CompressionMode.BZIP2);
-        case GZIP:
-          return
-              CompressedSource.from(source)
-                  .withDecompression(CompressionMode.GZIP);
-        case ZIP:
-          return
-              CompressedSource.from(source)
-                  .withDecompression(CompressionMode.ZIP);
-        case DEFLATE:
-          return
-              CompressedSource.from(source)
-                  .withDecompression(CompressionMode.DEFLATE);
-        default:
-          throw new IllegalArgumentException("Unknown compression type: " + compressionType);
-      }
+      return CompressedSource.from(new TextSource(getFilepattern(), getEmptyMatchTreatment()))
+          .withCompression(getCompression());
     }
 
     @Override
@@ -416,7 +392,7 @@ public class TextIO {
       super.populateDisplayData(builder);
       builder
           .add(
-              DisplayData.item("compressionType", getCompressionType().toString())
+              DisplayData.item("compressionType", getCompression().toString())
                   .withLabel("Compression Type"))
           .addIfNotNull(
               DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern"))
@@ -435,7 +411,7 @@ public class TextIO {
   @AutoValue
   public abstract static class ReadAll
       extends PTransform<PCollection<String>, PCollection<String>> {
-    abstract CompressionType getCompressionType();
+    abstract Compression getCompression();
 
     @Nullable
     abstract Duration getWatchForNewFilesInterval();
@@ -450,7 +426,7 @@ public class TextIO {
 
     @AutoValue.Builder
     abstract static class Builder {
-      abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setCompression(Compression compression);
       abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
       abstract Builder setWatchForNewFilesTerminationCondition(
           TerminationCondition<String, ?> condition);
@@ -460,9 +436,19 @@ public class TextIO {
       abstract ReadAll build();
     }
 
-    /** Same as {@link Read#withCompressionType(CompressionType)}. */
-    public ReadAll withCompressionType(CompressionType compressionType) {
-      return toBuilder().setCompressionType(compressionType).build();
+    /** @deprecated Use {@link #withCompression}. */
+    @Deprecated
+    public ReadAll withCompressionType(TextIO.CompressionType compressionType) {
+      return withCompression(compressionType.canonical);
+    }
+
+    /**
+     * Reads from input sources using the specified compression type.
+     *
+     * <p>If no compression type is specified, the default is {@link Compression#AUTO}.
+     */
+    public ReadAll withCompression(Compression compression) {
+      return toBuilder().setCompression(compression).build();
     }
 
     /** Same as {@link Read#withEmptyMatchTreatment}. */
@@ -499,9 +485,9 @@ public class TextIO {
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
-                  new IsSplittableFn(getCompressionType()),
+                  new IsSplittableFn(getCompression()),
                   getDesiredBundleSizeBytes(),
-                  new CreateTextSourceFn(getCompressionType(), getEmptyMatchTreatment())))
+                  new CreateTextSourceFn(getCompression(), getEmptyMatchTreatment())))
           .setCoder(StringUtf8Coder.of());
     }
 
@@ -510,39 +496,39 @@ public class TextIO {
       super.populateDisplayData(builder);
 
       builder.add(
-          DisplayData.item("compressionType", getCompressionType().toString())
+          DisplayData.item("compressionType", getCompression().toString())
               .withLabel("Compression Type"));
     }
 
     private static class CreateTextSourceFn
         implements SerializableFunction<String, FileBasedSource<String>> {
-      private final CompressionType compressionType;
+      private final Compression compression;
       private final EmptyMatchTreatment emptyMatchTreatment;
 
       private CreateTextSourceFn(
-          CompressionType compressionType, EmptyMatchTreatment emptyMatchTreatment) {
-        this.compressionType = compressionType;
+          Compression compression, EmptyMatchTreatment emptyMatchTreatment) {
+        this.compression = compression;
         this.emptyMatchTreatment = emptyMatchTreatment;
       }
 
       @Override
       public FileBasedSource<String> apply(String input) {
-        return Read.wrapWithCompression(
-            new TextSource(StaticValueProvider.of(input), emptyMatchTreatment), compressionType);
+        return CompressedSource.from(
+                new TextSource(StaticValueProvider.of(input), emptyMatchTreatment))
+            .withCompression(compression);
       }
     }
 
     private static class IsSplittableFn implements SerializableFunction<String, Boolean> {
-      private final CompressionType compressionType;
+      private final Compression compression;
 
-      private IsSplittableFn(CompressionType compressionType) {
-        this.compressionType = compressionType;
+      private IsSplittableFn(Compression compression) {
+        this.compression = compression;
       }
 
       @Override
       public Boolean apply(String filename) {
-        return compressionType == CompressionType.UNCOMPRESSED
-            || (compressionType == CompressionType.AUTO && !CompressionMode.isCompressed(filename));
+        return !compression.isCompressed(filename);
       }
     }
   }
@@ -811,7 +797,7 @@ public class TextIO {
     /**
      * Returns a transform for writing to text files like this one but that has the given {@link
      * WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
-     * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+     * default is value is {@link Compression#UNCOMPRESSED}.
      *
      * <p>A {@code null} value will reset the value to the default value mentioned above.
      */
@@ -821,6 +807,16 @@ public class TextIO {
     }
 
     /**
+     * Returns a transform for writing to text files like this one but that compresses output using
+     * the given {@link Compression}. The default value is {@link Compression#UNCOMPRESSED}.
+     */
+    public TypedWrite<UserT> withCompression(Compression compression) {
+      checkArgument(compression != null, "compression can not be null");
+      return withWritableByteChannelFactory(
+          FileBasedSink.CompressionType.fromCanonical(compression));
+    }
+
+    /**
      * Preserves windowing of input elements and writes them to files based on the element's window.
      *
      * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
@@ -1063,48 +1059,36 @@ public class TextIO {
     }
   }
 
-  /**
-   * Possible text file compression types.
-   */
+  /** @deprecated Use {@link Compression}. */
+  @Deprecated
   public enum CompressionType {
-    /**
-     * Automatically determine the compression type based on filename extension.
-     */
-    AUTO(""),
-    /**
-     * Uncompressed (i.e., may be split).
-     */
-    UNCOMPRESSED(""),
-    /**
-     * GZipped.
-     */
-    GZIP(".gz"),
-    /**
-     * BZipped.
-     */
-    BZIP2(".bz2"),
-    /**
-     * Zipped.
-     */
-    ZIP(".zip"),
-    /**
-     * Deflate compressed.
-     */
-    DEFLATE(".deflate");
+    /** @see Compression#AUTO */
+    AUTO(Compression.AUTO),
+
+    /** @see Compression#UNCOMPRESSED */
+    UNCOMPRESSED(Compression.UNCOMPRESSED),
+
+    /** @see Compression#GZIP */
+    GZIP(Compression.GZIP),
+
+    /** @see Compression#BZIP2 */
+    BZIP2(Compression.BZIP2),
 
-    private String filenameSuffix;
+    /** @see Compression#ZIP */
+    ZIP(Compression.ZIP),
 
-    CompressionType(String suffix) {
-      this.filenameSuffix = suffix;
+    /** @see Compression#ZIP */
+    DEFLATE(Compression.DEFLATE);
+
+    private Compression canonical;
+
+    CompressionType(Compression canonical) {
+      this.canonical = canonical;
     }
 
-    /**
-     * Determine if a given filename matches a compression type based on its extension.
-     * @param filename the filename to match
-     * @return true iff the filename ends with the compression type's known extension.
-     */
+    /** @see Compression#matches */
     public boolean matches(String filename) {
-      return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
+      return canonical.matches(filename);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index fe6f01f..352d38a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -358,7 +358,7 @@ public class CompressedSourceTest {
   }
 
   @Test
-  public void testUncompressedFileIsSplittable() throws Exception {
+  public void testUncompressedFileWithAutoIsSplittable() throws Exception {
     String baseName = "test-input";
 
     File uncompressedFile = tmpFolder.newFile(baseName + ".bin");
@@ -370,6 +370,21 @@ public class CompressedSourceTest {
     SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
   }
 
+
+  @Test
+  public void testUncompressedFileWithUncompressedIsSplittable() throws Exception {
+    String baseName = "test-input";
+
+    File uncompressedFile = tmpFolder.newFile(baseName + ".bin");
+    Files.write(generateInput(10), uncompressedFile);
+
+    CompressedSource<Byte> source =
+        CompressedSource.from(new ByteSource(uncompressedFile.getPath(), 1))
+            .withDecompression(CompressionMode.UNCOMPRESSED);
+    assertTrue(source.isSplittable());
+    SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
+  }
+
   @Test
   public void testGzipFileIsNotSplittable() throws Exception {
     String baseName = "test-input";

http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index ff30e33..fd8ad80 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -48,7 +48,6 @@ import java.util.zip.GZIPInputStream;
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.Writer;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -229,7 +228,7 @@ public class FileBasedSinkTest {
     String prefix = "file";
     SimpleSink<Void> sink =
         SimpleSink.makeSimpleSink(
-            getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED);
+            getBaseOutputDirectory(), prefix, "", "", Compression.UNCOMPRESSED);
 
     WriteOperation<Void, String> writeOp =
         new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
@@ -320,7 +319,7 @@ public class FileBasedSinkTest {
 
     SimpleSink<Void> sink =
         SimpleSink.makeSimpleSink(
-            root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED);
+            root, "file", ".SSSSS.of.NNNNN", ".test", Compression.UNCOMPRESSED);
     FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
 
     expected =
@@ -347,7 +346,7 @@ public class FileBasedSinkTest {
   public void testCollidingOutputFilenames() throws IOException {
     ResourceId root = getBaseOutputDirectory();
     SimpleSink<Void> sink =
-        SimpleSink.makeSimpleSink(root, "file", "-NN", "test", CompressionType.UNCOMPRESSED);
+        SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED);
     SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink);
 
     ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
@@ -376,7 +375,7 @@ public class FileBasedSinkTest {
     ResourceId root = getBaseOutputDirectory();
     SimpleSink<Void> sink =
         SimpleSink.makeSimpleSink(
-            root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED);
+            root, "file", "-SSSSS-of-NNNNN", "", Compression.UNCOMPRESSED);
     FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
 
     expected =
@@ -398,11 +397,11 @@ public class FileBasedSinkTest {
     assertEquals(expected, actual);
   }
 
-  /** {@link CompressionType#BZIP2} correctly writes BZip2 data. */
+  /** {@link Compression#BZIP2} correctly writes BZip2 data. */
   @Test
-  public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException {
+  public void testCompressionBZIP2() throws FileNotFoundException, IOException {
     final File file =
-        writeValuesWithWritableByteChannelFactory(CompressionType.BZIP2, "abc", "123");
+        writeValuesWithCompression(Compression.BZIP2, "abc", "123");
     // Read Bzip2ed data back in using Apache commons API (de facto standard).
     assertReadValues(
         new BufferedReader(
@@ -413,10 +412,10 @@ public class FileBasedSinkTest {
         "123");
   }
 
-  /** {@link CompressionType#GZIP} correctly writes Gzipped data. */
+  /** {@link Compression#GZIP} correctly writes Gzipped data. */
   @Test
-  public void testCompressionTypeGZIP() throws FileNotFoundException, IOException {
-    final File file = writeValuesWithWritableByteChannelFactory(CompressionType.GZIP, "abc", "123");
+  public void testCompressionGZIP() throws FileNotFoundException, IOException {
+    final File file = writeValuesWithCompression(Compression.GZIP, "abc", "123");
     // Read Gzipped data back in using standard API.
     assertReadValues(
         new BufferedReader(
@@ -426,11 +425,11 @@ public class FileBasedSinkTest {
         "123");
   }
 
-  /** {@link CompressionType#DEFLATE} correctly writes deflate data. */
+  /** {@link Compression#DEFLATE} correctly writes deflate data. */
   @Test
-  public void testCompressionTypeDEFLATE() throws FileNotFoundException, IOException {
+  public void testCompressionDEFLATE() throws FileNotFoundException, IOException {
     final File file =
-        writeValuesWithWritableByteChannelFactory(CompressionType.DEFLATE, "abc", "123");
+        writeValuesWithCompression(Compression.DEFLATE, "abc", "123");
     // Read Gzipped data back in using standard API.
     assertReadValues(
         new BufferedReader(
@@ -441,11 +440,11 @@ public class FileBasedSinkTest {
         "123");
   }
 
-  /** {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data. */
+  /** {@link Compression#UNCOMPRESSED} correctly writes uncompressed data. */
   @Test
-  public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException {
+  public void testCompressionUNCOMPRESSED() throws FileNotFoundException, IOException {
     final File file =
-        writeValuesWithWritableByteChannelFactory(CompressionType.UNCOMPRESSED, "abc", "123");
+        writeValuesWithCompression(Compression.UNCOMPRESSED, "abc", "123");
     // Read uncompressed data back in using standard API.
     assertReadValues(
         new BufferedReader(
@@ -462,11 +461,11 @@ public class FileBasedSinkTest {
     }
   }
 
-  private File writeValuesWithWritableByteChannelFactory(
-      final WritableByteChannelFactory factory, String... values) throws IOException {
+  private File writeValuesWithCompression(
+      Compression compression, String... values) throws IOException {
     final File file = tmpFolder.newFile("test.gz");
     final WritableByteChannel channel =
-        factory.create(Channels.newChannel(new FileOutputStream(file)));
+        compression.writeCompressed(Channels.newChannel(new FileOutputStream(file)));
     for (String value : values) {
       channel.write(ByteBuffer.wrap((value + "\n").getBytes(StandardCharsets.UTF_8)));
     }
@@ -512,7 +511,7 @@ public class FileBasedSinkTest {
   /** Build a SimpleSink with default options. */
   private SimpleSink<Void> buildSink() {
     return SimpleSink.makeSimpleSink(
-        getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", CompressionType.UNCOMPRESSED);
+        getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", Compression.UNCOMPRESSED);
   }
 
   /** Build a SimpleWriteOperation with default options and the given temporary directory. */

http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 382898d..b59876f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -36,12 +36,19 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, Strin
     super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory);
   }
 
+  public SimpleSink(
+      ResourceId tempDirectory,
+      DynamicDestinations<String, DestinationT, String> dynamicDestinations,
+      Compression compression) {
+    super(StaticValueProvider.of(tempDirectory), dynamicDestinations, compression);
+  }
+
   public static SimpleSink<Void> makeSimpleSink(
       ResourceId tempDirectory, FilenamePolicy filenamePolicy) {
     return new SimpleSink<>(
         tempDirectory,
         DynamicFileDestinations.<String>constant(filenamePolicy),
-        CompressionType.UNCOMPRESSED);
+        Compression.UNCOMPRESSED);
   }
 
   public static SimpleSink<Void> makeSimpleSink(
@@ -61,6 +68,20 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, Strin
     return new SimpleSink<>(baseDirectory, dynamicDestinations, writableByteChannelFactory);
   }
 
+  public static SimpleSink<Void> makeSimpleSink(
+      ResourceId baseDirectory,
+      String prefix,
+      String shardTemplate,
+      String suffix,
+      Compression compression) {
+    return makeSimpleSink(
+        baseDirectory,
+        prefix,
+        shardTemplate,
+        suffix,
+        FileBasedSink.CompressionType.fromCanonical(compression));
+  }
+
   @Override
   public SimpleWriteOperation<DestinationT> createWriteOperation() {
     return new SimpleWriteOperation<>(this);

http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
index d564d3b..6e5e4da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
@@ -17,11 +17,10 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType;
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.AUTO;
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.GZIP;
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.NONE;
-import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.ZLIB;
+import static org.apache.beam.sdk.io.Compression.AUTO;
+import static org.apache.beam.sdk.io.Compression.DEFLATE;
+import static org.apache.beam.sdk.io.Compression.GZIP;
+import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.isIn;
 import static org.junit.Assert.assertEquals;
@@ -144,7 +143,7 @@ public class TFRecordIOTest {
   public void testReadDisplayData() {
     TFRecordIO.Read read = TFRecordIO.read()
         .from("foo.*")
-        .withCompressionType(GZIP)
+        .withCompression(GZIP)
         .withoutValidation();
 
     DisplayData displayData = DisplayData.from(read);
@@ -161,7 +160,7 @@ public class TFRecordIOTest {
         .withSuffix("bar")
         .withShardNameTemplate("-SS-of-NN-")
         .withNumShards(100)
-        .withCompressionType(GZIP);
+        .withCompression(GZIP);
 
     DisplayData displayData = DisplayData.from(write);
 
@@ -265,25 +264,25 @@ public class TFRecordIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void runTestRoundTrip() throws IOException {
-    runTestRoundTrip(LARGE, 10, ".tfrecords", NONE, NONE);
+    runTestRoundTrip(LARGE, 10, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void runTestRoundTripWithEmptyData() throws IOException {
-    runTestRoundTrip(EMPTY, 10, ".tfrecords", NONE, NONE);
+    runTestRoundTrip(EMPTY, 10, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void runTestRoundTripWithOneShards() throws IOException {
-    runTestRoundTrip(LARGE, 1, ".tfrecords", NONE, NONE);
+    runTestRoundTrip(LARGE, 1, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void runTestRoundTripWithSuffix() throws IOException {
-    runTestRoundTrip(LARGE, 10, ".suffix", NONE, NONE);
+    runTestRoundTrip(LARGE, 10, ".suffix", UNCOMPRESSED, UNCOMPRESSED);
   }
 
   @Test
@@ -295,13 +294,13 @@ public class TFRecordIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void runTestRoundTripZlib() throws IOException {
-    runTestRoundTrip(LARGE, 10, ".tfrecords", ZLIB, ZLIB);
+    runTestRoundTrip(LARGE, 10, ".tfrecords", DEFLATE, DEFLATE);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void runTestRoundTripUncompressedFilesWithAuto() throws IOException {
-    runTestRoundTrip(LARGE, 10, ".tfrecords", NONE, AUTO);
+    runTestRoundTrip(LARGE, 10, ".tfrecords", UNCOMPRESSED, AUTO);
   }
 
   @Test
@@ -313,14 +312,14 @@ public class TFRecordIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void runTestRoundTripZlibFilesWithAuto() throws IOException {
-    runTestRoundTrip(LARGE, 10, ".tfrecords", ZLIB, AUTO);
+    runTestRoundTrip(LARGE, 10, ".tfrecords", DEFLATE, AUTO);
   }
 
   private void runTestRoundTrip(Iterable<String> elems,
                                 int numShards,
                                 String suffix,
-                                CompressionType writeCompressionType,
-                                CompressionType readCompressionType) throws IOException {
+                                Compression writeCompression,
+                                Compression readCompression) throws IOException {
     String outputName = "file";
     Path baseDir = Files.createTempDirectory(tempFolder, "test-rt");
     String baseFilename = baseDir.resolve(outputName).toString();
@@ -328,14 +327,14 @@ public class TFRecordIOTest {
     TFRecordIO.Write write = TFRecordIO.write().to(baseFilename)
         .withNumShards(numShards)
         .withSuffix(suffix)
-        .withCompressionType(writeCompressionType);
+        .withCompression(writeCompression);
     p.apply(Create.of(elems).withCoder(StringUtf8Coder.of()))
         .apply(ParDo.of(new StringToByteArray()))
         .apply(write);
     p.run();
 
     TFRecordIO.Read read = TFRecordIO.read().from(baseFilename + "*")
-        .withCompressionType(readCompressionType);
+        .withCompression(readCompression);
     PCollection<String> output = p2.apply(read).apply(ParDo.of(new ByteArrayToString()));
 
     PAssert.that(output).containsInAnyOrder(elems);