You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/21 18:10:01 UTC

[2/3] incubator-beam git commit: TextIOTest: rewrite for code reuse and actually more test coverage

TextIOTest: rewrite for code reuse and actually more test coverage

We had missed a few empty file cases, etc.

Suggestions for improvement still requested. Sad I had to delete the TemporaryFolder @Rule but
could not see how to get one-time file creation with it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5f7153a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5f7153a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5f7153a3

Branch: refs/heads/master
Commit: 5f7153a3b81d13f9786d24174ba881fb79205784
Parents: 370d592
Author: Dan Halperin <dh...@google.com>
Authored: Tue Sep 20 10:26:55 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Sep 21 11:09:50 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 477 ++++++++-----------
 1 file changed, 199 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f7153a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 49f5b16..fdfb652 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -21,8 +21,14 @@ import static org.apache.beam.sdk.TestUtils.INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
+import static org.apache.beam.sdk.io.TextIO.CompressionType.AUTO;
+import static org.apache.beam.sdk.io.TextIO.CompressionType.BZIP2;
+import static org.apache.beam.sdk.io.TextIO.CompressionType.GZIP;
+import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
+import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
+import static org.apache.beam.sdk.util.IOChannelUtils.resolve;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -51,8 +57,12 @@ import java.io.PrintStream;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
 import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -87,13 +97,13 @@ import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mockito;
@@ -110,19 +120,93 @@ public class TextIOTest {
 
   private static final String MY_HEADER = "myHeader";
   private static final String MY_FOOTER = "myFooter";
+  private static final String[] EMPTY = new String[] {};
+  private static final String[] TINY =
+      new String[] {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+  private static final String[] LARGE = makeLines(5000);
+
+  private static Path tempFolder;
+  private static File emptyTxt;
+  private static File tinyTxt;
+  private static File largeTxt;
+  private static File emptyGz;
+  private static File tinyGz;
+  private static File largeGz;
+  private static File emptyBzip2;
+  private static File tinyBzip2;
+  private static File largeBzip2;
+  private static File emptyZip;
+  private static File tinyZip;
+  private static File largeZip;
 
   @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
+  private static File writeToFile(String[] lines, String filename, CompressionType compression)
+      throws IOException {
+    File file = tempFolder.resolve(filename).toFile();
+    OutputStream output = new FileOutputStream(file);
+    switch (compression) {
+      case UNCOMPRESSED:
+        break;
+      case GZIP:
+        output = new GZIPOutputStream(output);
+        break;
+      case BZIP2:
+        output = new BZip2CompressorOutputStream(output);
+        break;
+      case ZIP:
+        ZipOutputStream zipOutput = new ZipOutputStream(output);
+        zipOutput.putNextEntry(new ZipEntry("entry"));
+        output = zipOutput;
+        break;
+      default:
+        throw new UnsupportedOperationException(compression.toString());
+    }
+    writeToStreamAndClose(lines, output);
+    return file;
+  }
+
   @BeforeClass
-  public static void setupClass() {
+  public static void setupClass() throws IOException {
     IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions());
+    tempFolder =  Files.createTempDirectory("TextIOTest");
+    // empty files
+    emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED);
+    emptyGz = writeToFile(EMPTY, "empty.gz", GZIP);
+    emptyBzip2 = writeToFile(EMPTY, "empty.bz2", BZIP2);
+    emptyZip = writeToFile(EMPTY, "empty.zip", ZIP);
+    // tiny files
+    tinyTxt = writeToFile(TINY, "tiny.txt", CompressionType.UNCOMPRESSED);
+    tinyGz = writeToFile(TINY, "tiny.gz", GZIP);
+    tinyBzip2 = writeToFile(TINY, "tiny.bz2", BZIP2);
+    tinyZip = writeToFile(TINY, "tiny.zip", ZIP);
+    // large files
+    largeTxt = writeToFile(LARGE, "large.txt", CompressionType.UNCOMPRESSED);
+    largeGz = writeToFile(LARGE, "large.gz", GZIP);
+    largeBzip2 = writeToFile(LARGE, "large.bz2", BZIP2);
+    largeZip = writeToFile(LARGE, "large.zip", ZIP);
+  }
+
+  @AfterClass
+  public static void testdownClass() throws IOException {
+    Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() {
+      @Override
+      public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+        Files.delete(file);
+        return FileVisitResult.CONTINUE;
+      }
+
+      @Override
+      public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+        Files.delete(dir);
+        return FileVisitResult.CONTINUE;
+      }
+    });
   }
 
-  <T> void runTestRead(T[] expected, Coder<T> coder) throws Exception {
-    File tmpFile = tmpFolder.newFile("file.txt");
+  private <T> void runTestRead(T[] expected, Coder<T> coder) throws Exception {
+    File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile();
     String filename = tmpFile.getPath();
 
     try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) {
@@ -182,32 +266,27 @@ public class TextIOTest {
 
   @Test
   public void testReadNamed() throws Exception {
-    String file = tmpFolder.newFile().getAbsolutePath();
     Pipeline p = TestPipeline.create();
 
-    {
-      PCollection<String> output1 =
-          p.apply(TextIO.Read.from(file));
-      assertEquals("TextIO.Read/Read.out", output1.getName());
-    }
-
-    {
-      PCollection<String> output2 = p.apply("MyRead", TextIO.Read.from(file));
-      assertEquals("MyRead/Read.out", output2.getName());
-    }
+    assertEquals(
+        "TextIO.Read/Read.out",
+        p.apply(TextIO.Read.withoutValidation().from("somefile")).getName());
+    assertEquals(
+        "MyRead/Read.out",
+        p.apply("MyRead", TextIO.Read.withoutValidation().from(emptyTxt.getPath())).getName());
   }
 
   @Test
   public void testReadDisplayData() {
     TextIO.Read.Bound<?> read = TextIO.Read
         .from("foo.*")
-        .withCompressionType(CompressionType.BZIP2)
+        .withCompressionType(BZIP2)
         .withoutValidation();
 
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
-    assertThat(displayData, hasDisplayItem("compressionType", CompressionType.BZIP2.toString()));
+    assertThat(displayData, hasDisplayItem("compressionType", BZIP2.toString()));
     assertThat(displayData, hasDisplayItem("validation", false));
   }
 
@@ -225,22 +304,24 @@ public class TextIOTest {
         displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
   }
 
-  <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
+  private <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
     runTestWrite(elems, null, null, coder, 1);
   }
 
-  <T> void runTestWrite(T[] elems, Coder<T> coder, int numShards) throws Exception {
+  private <T> void runTestWrite(T[] elems, Coder<T> coder, int numShards) throws Exception {
     runTestWrite(elems, null, null, coder, numShards);
   }
 
-  <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception {
+  private <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer)
+      throws Exception {
     runTestWrite(elems, header, footer, coder, 1);
   }
 
-  <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards)
-      throws Exception {
+  private <T> void runTestWrite(
+      T[] elems, String header, String footer, Coder<T> coder, int numShards) throws Exception {
     String outputName = "file.txt";
-    String baseFilename = tmpFolder.newFile(outputName).getPath();
+    Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
+    String baseFilename = baseDir.resolve(outputName).toString();
 
     Pipeline p = TestPipeline.create();
 
@@ -265,7 +346,7 @@ public class TextIOTest {
 
     p.run();
 
-    assertOutputFiles(elems, header, footer, coder, numShards, tmpFolder, outputName,
+    assertOutputFiles(elems, header, footer, coder, numShards, baseDir, outputName,
         write.getShardNameTemplate());
   }
 
@@ -275,14 +356,14 @@ public class TextIOTest {
       final String footer,
       Coder<T> coder,
       int numShards,
-      TemporaryFolder rootLocation,
+      Path rootLocation,
       String outputName,
       String shardNameTemplate)
       throws Exception {
     List<File> expectedFiles = new ArrayList<>();
     if (numShards == 0) {
       String pattern =
-          IOChannelUtils.resolve(rootLocation.getRoot().getAbsolutePath(), outputName + "*");
+          resolve(rootLocation.toAbsolutePath().toString(), outputName + "*");
       for (String expected : IOChannelUtils.getFactory(pattern).match(pattern)) {
         expectedFiles.add(new File(expected));
       }
@@ -290,7 +371,7 @@ public class TextIOTest {
       for (int i = 0; i < numShards; i++) {
         expectedFiles.add(
             new File(
-                rootLocation.getRoot(),
+                rootLocation.toString(),
                 IOChannelUtils.constructName(outputName, shardNameTemplate, "", i, numShards)));
       }
     }
@@ -312,8 +393,7 @@ public class TextIOTest {
     }
 
     List<String> expectedElements = new ArrayList<>(elems.length);
-    for (int i = 0; i < elems.length; i++) {
-      T elem = elems[i];
+    for (T elem : elems) {
       byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem);
       String line = new String(encodedElem);
       expectedElements.add(line);
@@ -458,9 +538,8 @@ public class TextIOTest {
 
   @Test
   public void testUnsupportedFilePattern() throws IOException {
-    File outFolder = tmpFolder.newFolder();
     // Windows doesn't like resolving paths with * in them.
-    String filename = outFolder.toPath().resolve("output@5").toString();
+    String filename = tempFolder.resolve("output@5").toString();
 
     Pipeline p = TestPipeline.create();
 
@@ -505,9 +584,9 @@ public class TextIOTest {
   @Test
   public void testCompressionTypeIsSet() throws Exception {
     TextIO.Read.Bound<String> read = TextIO.Read.from("gs://bucket/test");
-    assertEquals(CompressionType.AUTO, read.getCompressionType());
-    read = TextIO.Read.from("gs://bucket/test").withCompressionType(CompressionType.GZIP);
-    assertEquals(CompressionType.GZIP, read.getCompressionType());
+    assertEquals(AUTO, read.getCompressionType());
+    read = TextIO.Read.from("gs://bucket/test").withCompressionType(GZIP);
+    assertEquals(GZIP, read.getCompressionType());
   }
 
   /**
@@ -527,10 +606,10 @@ public class TextIOTest {
    * and asserts that the results match the given expected output.
    */
   private static void assertReadingCompressedFileMatchesExpected(
-      String filename, CompressionType compressionType, String[] expected) {
+      File file, CompressionType compressionType, String[] expected) {
     Pipeline p = TestPipeline.create();
     TextIO.Read.Bound<String> read =
-        TextIO.Read.from(filename).withCompressionType(compressionType);
+        TextIO.Read.from(file.getPath()).withCompressionType(compressionType);
     PCollection<String> output = p.apply(read);
 
     PAssert.that(output).containsInAnyOrder(expected);
@@ -554,29 +633,8 @@ public class TextIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testSmallCompressedGzipReadNoExtension() throws Exception {
-    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
-    File tmpFile = tmpFolder.newFile(); // no GZ extension
-    String filename = tmpFile.getPath();
-
-    writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(tmpFile)));
-    assertReadingCompressedFileMatchesExpected(filename, CompressionType.GZIP, lines);
-  }
-
-  /**
-   * Tests reading from a small, gzipped file with .gz extension and AUTO or GZIP compression set.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testSmallCompressedGzipRead() throws Exception {
-    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
-    File tmpFile = tmpFolder.newFile("small_gzip.gz");
-    String filename = tmpFile.getPath();
-
-    writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(tmpFile)));
-    // Should work in AUTO mode.
-    assertReadingCompressedFileMatchesExpected(filename, CompressionType.AUTO, lines);
-    // Should work in GZIP mode.
-    assertReadingCompressedFileMatchesExpected(filename, CompressionType.GZIP, lines);
+    File smallGzNoExtension = writeToFile(TINY, "tiny_gz_no_extension", GZIP);
+    assertReadingCompressedFileMatchesExpected(smallGzNoExtension, GZIP, TINY);
   }
 
   /**
@@ -587,15 +645,12 @@ public class TextIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testSmallCompressedGzipReadActuallyUncompressed() throws Exception {
-    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
-    File tmpFile = tmpFolder.newFile("not_really_gzipped.gz"); // GZ file extension lies
-    String filename = tmpFile.getPath();
-
-    writeToStreamAndClose(lines, new FileOutputStream(tmpFile));
+    File smallGzNotCompressed =
+        writeToFile(TINY, "tiny_uncompressed.gz", CompressionType.UNCOMPRESSED);
     // Should work with GZIP compression set.
-    assertReadingCompressedFileMatchesExpected(filename, CompressionType.GZIP, lines);
+    assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, GZIP, TINY);
     // Should also work with AUTO mode set.
-    assertReadingCompressedFileMatchesExpected(filename, CompressionType.AUTO, lines);
+    assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, AUTO, TINY);
   }
 
   /**
@@ -604,109 +659,8 @@ public class TextIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testSmallCompressedBzip2ReadNoExtension() throws Exception {
-    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
-    File tmpFile = tmpFolder.newFile(); // no BZ2 extension
-    String filename = tmpFile.getPath();
-
-    writeToStreamAndClose(lines, new BZip2CompressorOutputStream(new FileOutputStream(tmpFile)));
-    assertReadingCompressedFileMatchesExpected(filename, CompressionType.BZIP2, lines);
-  }
-
-  /**
-   * Tests reading from a small, bzip2ed file with .bz2 extension and AUTO or BZIP2 compression set.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testSmallCompressedBzipRead() throws Exception {
-    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
-    File tmpFile = tmpFolder.newFile("small_bzip2.bz2");
-    String filename = tmpFile.getPath();
-
-    writeToStreamAndClose(lines, new BZip2CompressorOutputStream(new FileOutputStream(tmpFile)));
-    // Should work in AUTO mode.
-    assertReadingCompressedFileMatchesExpected(filename, CompressionType.AUTO, lines);
-    // Should work in BZIP2 mode.
-    assertReadingCompressedFileMatchesExpected(filename, CompressionType.BZIP2, lines);
-  }
-
-  /**
-   * Tests reading from a large, bzip2ed file with .bz2 extension and AUTO or BZIP2 compression set.
-   * It is important to test a large compressible file because using only small files may mask bugs
-   * from range tracking that can only occur if the file compression ratio is high -- small
-   * compressed files are usually as big as the uncompressed ones or bigger.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testLargeCompressedBzipRead() throws Exception {
-    String[] lines = makeLines(5000);
-    File bz2File = tmpFolder.newFile("large_bzip2.bz2");
-    String bz2Filename = bz2File.getPath();
-
-    writeToStreamAndClose(lines, new BZip2CompressorOutputStream(new FileOutputStream(bz2File)));
-    // Should work in AUTO mode.
-    assertReadingCompressedFileMatchesExpected(bz2Filename, CompressionType.AUTO, lines);
-    // Should work in BZIP2 mode.
-    assertReadingCompressedFileMatchesExpected(bz2Filename, CompressionType.BZIP2, lines);
-
-    // Confirm that the compressed file is smaller than the uncompressed file.
-    File txtFile = tmpFolder.newFile("large_bzip2.txt");
-    writeToStreamAndClose(lines, new FileOutputStream(txtFile));
-    assertThat(Files.size(txtFile.toPath()), greaterThan(Files.size(bz2File.toPath())));
-  }
-
-  /**
-   * Tests reading from a large, gzipped file with .gz extension and AUTO or GZIP compression set.
-   * It is important to test a large compressible file because using only small files may mask bugs
-   * from range tracking that can only occur if the file compression ratio is high -- small
-   * compressed files are usually as big as the uncompressed ones or bigger.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testLargeCompressedGzipRead() throws Exception {
-    String[] lines = makeLines(5000);
-    File gzFile = tmpFolder.newFile("large_gzip.gz");
-    String gzFilename = gzFile.getPath();
-
-    writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(gzFile)));
-    // Should work in AUTO mode.
-    assertReadingCompressedFileMatchesExpected(gzFilename, CompressionType.AUTO, lines);
-    // Should work in BZIP2 mode.
-    assertReadingCompressedFileMatchesExpected(gzFilename, CompressionType.GZIP, lines);
-
-    // Confirm that the compressed file is smaller than the uncompressed file.
-    File txtFile = tmpFolder.newFile("large_gzip.txt");
-    writeToStreamAndClose(lines, new FileOutputStream(txtFile));
-    assertThat(Files.size(txtFile.toPath()), greaterThan(Files.size(gzFile.toPath())));
-  }
-
-  /**
-   * Tests reading from a large, uncompressed file.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testLargeUncompressedReadTxt() throws Exception {
-    String[] lines = makeLines(5000);
-    File txtFile = tmpFolder.newFile("large_file.txt");
-    String txtFilename = txtFile.getPath();
-
-    writeToStreamAndClose(lines, new FileOutputStream(txtFile));
-    // Should work in AUTO mode.
-    assertReadingCompressedFileMatchesExpected(txtFilename, CompressionType.AUTO, lines);
-  }
-
-  /**
-   * Tests reading from a large, uncompressed file with a weird file extension.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testLargeUncompressedReadWeirdExtension() throws Exception {
-    String[] lines = makeLines(5000);
-    File txtFile = tmpFolder.newFile("large_file.bin.data.foo");
-    String txtFilename = txtFile.getPath();
-
-    writeToStreamAndClose(lines, new FileOutputStream(txtFile));
-    // Should work in AUTO mode.
-    assertReadingCompressedFileMatchesExpected(txtFilename, CompressionType.AUTO, lines);
+    File smallBz2NoExtension = writeToFile(TINY, "tiny_bz2_no_extension", BZIP2);
+    assertReadingCompressedFileMatchesExpected(smallBz2NoExtension, BZIP2, TINY);
   }
 
   /**
@@ -718,16 +672,11 @@ public class TextIOTest {
    * @return The zip filename.
    * @throws Exception In case of a failure during zip file creation.
    */
-  private String createZipFile(List<String> expected, @Nullable String filename, String[]
+  private String createZipFile(List<String> expected, String filename, String[]
       ...
       fieldsEntries)
       throws Exception {
-    File tmpFile;
-    if (filename != null) {
-      tmpFile = tmpFolder.newFile(filename);
-    } else {
-      tmpFile = tmpFolder.newFile();
-    }
+    File tmpFile = tempFolder.resolve(filename).toFile();
     String tmpFileName = tmpFile.getPath();
 
     ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));
@@ -750,87 +699,85 @@ public class TextIOTest {
     return tmpFileName;
   }
 
-  /**
-   * Read a zip compressed file. The user provides the ZIP compression type.
-   * We expect a PCollection with the lines.
-   */
   @Test
   @Category(NeedsRunner.class)
-  public void testZipCompressedRead() throws Exception {
-    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
-    List<String> expected = new ArrayList<>();
-
-    String filename = createZipFile(expected, null, lines);
+  public void testTxtRead() throws Exception {
+    // Files with non-compressed extensions should work in AUTO and UNCOMPRESSED modes.
+    for (CompressionType type : new CompressionType[] { AUTO, UNCOMPRESSED }) {
+      assertReadingCompressedFileMatchesExpected(emptyTxt, type, EMPTY);
+      assertReadingCompressedFileMatchesExpected(tinyTxt, type, TINY);
+      assertReadingCompressedFileMatchesExpected(largeTxt, type, LARGE);
+    }
+  }
 
-    Pipeline p = TestPipeline.create();
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGzipCompressedRead() throws Exception {
+    // Files with the right extensions should work in AUTO and GZIP modes.
+    for (CompressionType type : new CompressionType[] { AUTO, GZIP }) {
+      assertReadingCompressedFileMatchesExpected(emptyGz, type, EMPTY);
+      assertReadingCompressedFileMatchesExpected(tinyGz, type, TINY);
+      assertReadingCompressedFileMatchesExpected(largeGz, type, LARGE);
+    }
 
-    TextIO.Read.Bound<String> read =
-        TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
-    PCollection<String> output = p.apply(read);
+    // Sanity check that we're properly testing compression.
+    assertThat(largeTxt.length(), greaterThan(largeGz.length()));
 
-    PAssert.that(output).containsInAnyOrder(expected);
-    p.run();
+    // GZIP files with non-gz extension should work in GZIP mode.
+    File gzFile = writeToFile(TINY, "tiny_gz_no_extension", GZIP);
+    assertReadingCompressedFileMatchesExpected(gzFile, GZIP, TINY);
   }
 
-  /**
-   * Read a zip compressed file. The ZIP compression type is auto-detected based on the
-   * file extension. We expect a PCollection with the lines.
-   */
   @Test
   @Category(NeedsRunner.class)
-  public void testZipCompressedAutoDetected() throws Exception {
-    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
-    List<String> expected = new ArrayList<>();
-
-    String filename = createZipFile(expected, "testZipCompressedAutoDetected.zip", lines);
-
-    // test with auto-detect ZIP based on extension.
-    Pipeline p = TestPipeline.create();
+  public void testBzip2CompressedRead() throws Exception {
+    // Files with the right extensions should work in AUTO and BZIP2 modes.
+    for (CompressionType type : new CompressionType[] { AUTO, BZIP2 }) {
+      assertReadingCompressedFileMatchesExpected(emptyBzip2, type, EMPTY);
+      assertReadingCompressedFileMatchesExpected(tinyBzip2, type, TINY);
+      assertReadingCompressedFileMatchesExpected(largeBzip2, type, LARGE);
+    }
 
-    TextIO.Read.Bound<String> read = TextIO.Read.from(filename);
-    PCollection<String> output = p.apply(read);
+    // Sanity check that we're properly testing compression.
+    assertThat(largeTxt.length(), greaterThan(largeBzip2.length()));
 
-    PAssert.that(output).containsInAnyOrder(expected);
-    p.run();
+    // BZ2 files with non-bz2 extension should work in BZIP2 mode.
+    File bz2File = writeToFile(TINY, "tiny_bz2_no_extension", BZIP2);
+    assertReadingCompressedFileMatchesExpected(bz2File, BZIP2, TINY);
   }
 
-  /**
-   * Read a ZIP compressed empty file. We expect an empty PCollection.
-   */
   @Test
   @Category(NeedsRunner.class)
-  public void testZipCompressedReadWithEmptyFile() throws Exception {
-    String filename = createZipFile(new ArrayList<String>(), null);
-
-    Pipeline p = TestPipeline.create();
+  public void testZipCompressedRead() throws Exception {
+    // Files with the right extensions should work in AUTO and ZIP modes.
+    for (CompressionType type : new CompressionType[]{AUTO, ZIP}) {
+      assertReadingCompressedFileMatchesExpected(emptyZip, type, EMPTY);
+      assertReadingCompressedFileMatchesExpected(tinyZip, type, TINY);
+      assertReadingCompressedFileMatchesExpected(largeZip, type, LARGE);
+    }
 
-    PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
-        (CompressionType.ZIP));
-    PAssert.that(output).empty();
+    // Sanity check that we're properly testing compression.
+    assertThat(largeTxt.length(), greaterThan(largeZip.length()));
 
-    p.run();
+    // Zip files with non-zip extension should work in ZIP mode.
+    File zipFile = writeToFile(TINY, "tiny_zip_no_extension", ZIP);
+    assertReadingCompressedFileMatchesExpected(zipFile, ZIP, TINY);
   }
 
   /**
-   * Read a ZIP compressed file containing an unique empty entry. We expect an empty PCollection.
+   * Tests a zip file with no entries. This is a corner case not tested elsewhere as the default
+   * test zip files have a single entry.
    */
   @Test
   @Category(NeedsRunner.class)
-  public void testZipCompressedReadWithEmptyEntry() throws Exception {
-    String filename = createZipFile(new ArrayList<String>(), null, new String[]{});
-
-    Pipeline p = TestPipeline.create();
-
-    PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
-        (CompressionType.ZIP));
-    PAssert.that(output).empty();
-
-    p.run();
+  public void testZipCompressedReadWithNoEntries() throws Exception {
+    String filename = createZipFile(new ArrayList<String>(), "empty zip file");
+    assertReadingCompressedFileMatchesExpected(new File(filename), CompressionType.ZIP, EMPTY);
   }
 
   /**
-   * Read a ZIP compressed file with multiple entries. We expect a PCollection containing
-   * lines from all entries.
+   * Tests a zip file with multiple entries. This is a corner case not tested elsewhere as the
+   * default test zip files have a single entry.
    */
   @Test
   @Category(NeedsRunner.class)
@@ -841,16 +788,9 @@ public class TextIOTest {
 
     List<String> expected = new ArrayList<>();
 
-    String filename = createZipFile(expected, null, entry0, entry1, entry2);
-
-    Pipeline p = TestPipeline.create();
-
-    TextIO.Read.Bound<String> read =
-        TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
-    PCollection<String> output = p.apply(read);
-
-    PAssert.that(output).containsInAnyOrder(expected);
-    p.run();
+    String filename = createZipFile(expected, "multiple entries", entry0, entry1, entry2);
+    assertReadingCompressedFileMatchesExpected(
+        new File(filename), CompressionType.ZIP, expected.toArray(new String[]{}));
   }
 
   /**
@@ -862,27 +802,20 @@ public class TextIOTest {
   public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exception {
     String filename = createZipFile(
         new ArrayList<String>(),
-        null,
+        "complex empty and present entries",
         new String[]{"cat"},
         new String[]{},
         new String[]{},
         new String[]{"dog"});
-    List<String> expected = ImmutableList.of("cat", "dog");
 
-    Pipeline p = TestPipeline.create();
-
-    PCollection<String> output =
-        p.apply(TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP));
-    PAssert.that(output).containsInAnyOrder(expected);
-
-    p.run();
+    assertReadingCompressedFileMatchesExpected(
+        new File(filename), CompressionType.ZIP, new String[] {"cat", "dog"});
   }
 
   @Test
   public void testTextIOGetName() {
     assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName());
     assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
-
     assertEquals("TextIO.Read", TextIO.Read.from("somefile").toString());
   }
 
@@ -1118,23 +1051,20 @@ public class TextIOTest {
   }
 
   private TextSource<String> prepareSource(byte[] data) throws IOException {
-    File file = tmpFolder.newFile();
-    Files.write(file.toPath(), data);
-    return new TextSource<>(file.toPath().toString(), StringUtf8Coder.of());
+    Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
+    Files.write(path, data);
+    return new TextSource<>(path.toString(), StringUtf8Coder.of());
   }
 
   @Test
   public void testInitialSplitIntoBundlesAutoModeTxt() throws Exception {
-    String[] lines = makeLines(5000);
-    File file = tmpFolder.newFile("to_be_split_auto.txt");
-    writeToStreamAndClose(lines, new FileOutputStream(file));
     PipelineOptions options = TestPipeline.testingPipelineOptions();
     long desiredBundleSize = 1000;
 
     // Sanity check: file is at least 2 bundles long.
-    assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+    assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
 
-    FileBasedSource<String> source = TextIO.Read.from(file.getPath()).getSource();
+    FileBasedSource<String> source = TextIO.Read.from(largeTxt.getPath()).getSource();
     List<? extends FileBasedSource<String>> splits =
         source.splitIntoBundles(desiredBundleSize, options);
 
@@ -1145,16 +1075,13 @@ public class TextIOTest {
 
   @Test
   public void testInitialSplitIntoBundlesAutoModeGz() throws Exception {
-    String[] lines = makeLines(5000);
-    File file = tmpFolder.newFile("to_be_split_auto.gz");
-    writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(file)));
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
     long desiredBundleSize = 1000;
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
 
     // Sanity check: file is at least 2 bundles long.
-    assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+    assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
 
-    FileBasedSource<String> source = TextIO.Read.from(file.getPath()).getSource();
+    FileBasedSource<String> source = TextIO.Read.from(largeGz.getPath()).getSource();
     List<? extends FileBasedSource<String>> splits =
         source.splitIntoBundles(desiredBundleSize, options);
 
@@ -1165,38 +1092,32 @@ public class TextIOTest {
 
   @Test
   public void testInitialSplitIntoBundlesGzipModeTxt() throws Exception {
-    String[] lines = makeLines(5000);
-    File file = tmpFolder.newFile("to_be_split_gzip.txt");
-    writeToStreamAndClose(lines, new FileOutputStream(file));
     PipelineOptions options = TestPipeline.testingPipelineOptions();
     long desiredBundleSize = 1000;
 
     // Sanity check: file is at least 2 bundles long.
-    assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+    assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
 
     FileBasedSource<String> source =
-        TextIO.Read.from(file.getPath()).withCompressionType(CompressionType.GZIP).getSource();
+        TextIO.Read.from(largeTxt.getPath()).withCompressionType(GZIP).getSource();
     List<? extends FileBasedSource<String>> splits =
         source.splitIntoBundles(desiredBundleSize, options);
 
-    // Exactly 1 split, even though .txt extension, since using GZIP mode.
+    // Exactly 1 split, even though splittable text file, since using GZIP mode.
     assertThat(splits, hasSize(equalTo(1)));
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
   }
 
   @Test
   public void testInitialSplitIntoBundlesGzipModeGz() throws Exception {
-    String[] lines = makeLines(5000);
-    File file = tmpFolder.newFile("to_be_split_gzip.gz");
-    writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(file)));
     PipelineOptions options = TestPipeline.testingPipelineOptions();
     long desiredBundleSize = 1000;
 
     // Sanity check: file is at least 2 bundles long.
-    assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+    assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
 
     FileBasedSource<String> source =
-        TextIO.Read.from(file.getPath()).withCompressionType(CompressionType.GZIP).getSource();
+        TextIO.Read.from(largeGz.getPath()).withCompressionType(GZIP).getSource();
     List<? extends FileBasedSource<String>> splits =
         source.splitIntoBundles(desiredBundleSize, options);