You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 22:10:11 UTC

[beam] branch master updated (b9b85fd -> 7564d82)

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from b9b85fd  Update the version of org.xolstice.maven.plugins to 0.5.1 to attempt to fix https://issues.apache.org/jira/browse/BEAM-2823 for failures in beam_PostCommit_Java_MavenInstall_Windows
     new 1a6c25c  [BEAM-3060] add TFRecordIOIT
     new 7564d82  add post-review updates

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/sdk/io/common/IOTestPipelineOptions.java  |   3 +-
 .../beam/sdk/io/common/FileBasedIOITHelper.java    | 103 ++++++++++++++++
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java |  94 ++------------
 .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java  | 137 +++++++++++++++++++++
 4 files changed, 254 insertions(+), 83 deletions(-)
 create mode 100644 sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
 create mode 100644 sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].

[beam] 01/02: [BEAM-3060] add TFRecordIOIT

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1a6c25c7c547de882b7ec4da80a5ace1d01f397d
Author: Łukasz Gajowy <lu...@polidea.com>
AuthorDate: Tue Nov 28 13:23:28 2017 -0800

    [BEAM-3060] add TFRecordIOIT
---
 .../beam/sdk/io/common/IOTestPipelineOptions.java  |   2 +-
 .../beam/sdk/io/common/AbstractFileBasedIOIT.java  | 111 +++++++++++++++++
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java |  86 +------------
 .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java  | 133 +++++++++++++++++++++
 4 files changed, 251 insertions(+), 81 deletions(-)

diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 5a29d4f..d919654 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -96,7 +96,7 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
   void setNumberOfRecords(Long count);
 
   @Description("Destination prefix for files generated by the test")
-  @Default.String("TEXTIOIT")
+  @Default.String("FILEBASEDIOIT")
   String getFilenamePrefix();
 
   void setFilenamePrefix(String prefix);
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
new file mode 100644
index 0000000..9eb8aea
--- /dev/null
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.common;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Abstract class for file based IO Integration tests.
+ */
+public abstract class AbstractFileBasedIOIT {
+
+  protected static IOTestPipelineOptions readTestPipelineOptions() {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    return TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+  }
+
+  protected static String appendTimestampToPrefix(String filenamePrefix) {
+    return String.format("%s_%s", filenamePrefix, new Date().getTime());
+  }
+
+  protected static Compression parseCompressionType(String compressionType) {
+    try {
+      return Compression.valueOf(compressionType.toUpperCase());
+    } catch (IllegalArgumentException ex) {
+      throw new IllegalArgumentException(
+          String.format("Unsupported compression type: %s", compressionType));
+    }
+  }
+
+  protected String getExpectedHashForLineCount(Long lineCount) {
+    Map<Long, String> expectedHashes = ImmutableMap.of(
+        100_000L, "4c8bb3b99dcc59459b20fefba400d446",
+        1_000_000L, "9796db06e7a7960f974d5a91164afff1",
+        100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
+    );
+
+    String hash = expectedHashes.get(lineCount);
+    if (hash == null) {
+      throw new UnsupportedOperationException(
+          String.format("No hash for that line count: %s", lineCount)
+      );
+    }
+    return hash;
+  }
+
+  /**
+   * Constructs text lines in files used for testing.
+   */
+  public static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(String.format("IO IT Test line of text. Line seed: %s", c.element()));
+    }
+  }
+
+  /**
+   * Deletes matching files using the FileSystems API.
+   */
+  public static class DeleteFileFn extends DoFn<String, Void> {
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      MatchResult match = Iterables
+          .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
+
+      Collection<ResourceId> resourceIds = toResourceIds(match);
+
+      FileSystems.delete(resourceIds);
+    }
+    private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException {
+      return FluentIterable.from(match.metadata())
+          .transform(new Function<MatchResult.Metadata, ResourceId>() {
+            @Override
+            public ResourceId apply(MatchResult.Metadata metadata) {
+              return metadata.resourceId();
+            }
+          }).toList();
+    }
+  }
+}
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index e9aac80..7593f85 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -20,31 +20,16 @@ package org.apache.beam.sdk.io.text;
 
 import static org.apache.beam.sdk.io.Compression.AUTO;
 
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-
-import java.io.IOException;
 import java.text.ParseException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Map;
-
 import org.apache.beam.sdk.io.Compression;
-import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
@@ -64,16 +49,16 @@ import org.junit.runners.JUnit4;
  *  -Dit.test=org.apache.beam.sdk.io.text.TextIOIT
  *  -DintegrationTestPipelineOptions='[
  *  "--numberOfRecords=100000",
- *  "--filenamePrefix=TEXTIOIT"
+ *  "--filenamePrefix=FILEBASEDIOIT"
  *  "--compressionType=GZIP"
  *  ]'
  * </pre>
  * </p>
  * <p>Please see 'sdks/java/io/file-based-io-tests/pom.xml' for instructions regarding
  * running this test using Beam performance testing framework.</p>
- * */
+ */
 @RunWith(JUnit4.class)
-public class TextIOIT {
+public class TextIOIT extends AbstractFileBasedIOIT {
 
   private static String filenamePrefix;
   private static Long numberOfTextLines;
@@ -84,28 +69,13 @@ public class TextIOIT {
 
   @BeforeClass
   public static void setup() throws ParseException {
-    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
-    IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
-        .as(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options = readTestPipelineOptions();
 
     numberOfTextLines = options.getNumberOfRecords();
-    filenamePrefix = appendTimestamp(options.getFilenamePrefix());
+    filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
     compressionType = parseCompressionType(options.getCompressionType());
   }
 
-  private static Compression parseCompressionType(String compressionType) {
-    try {
-      return Compression.valueOf(compressionType.toUpperCase());
-    } catch (IllegalArgumentException ex) {
-      throw new IllegalArgumentException(
-          String.format("Unsupported compression type: %s", compressionType));
-    }
-  }
-
-  private static String appendTimestamp(String filenamePrefix) {
-    return String.format("%s_%s", filenamePrefix, new Date().getTime());
-  }
-
   @Test
   public void writeThenReadAll() {
     TextIO.TypedWrite<String, Object> write = TextIO
@@ -132,48 +102,4 @@ public class TextIOIT {
 
     pipeline.run().waitUntilFinish();
   }
-
-  private static String getExpectedHashForLineCount(Long lineCount) {
-    Map<Long, String> expectedHashes = ImmutableMap.of(
-        100_000L, "4c8bb3b99dcc59459b20fefba400d446",
-        1_000_000L, "9796db06e7a7960f974d5a91164afff1",
-        100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
-    );
-
-    String hash = expectedHashes.get(lineCount);
-    if (hash == null) {
-      throw new UnsupportedOperationException(
-          String.format("No hash for that line count: %s", lineCount));
-    }
-    return hash;
-  }
-
-  private static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(String.format("IO IT Test line of text. Line seed: %s", c.element()));
-    }
-  }
-
-  private static class DeleteFileFn extends DoFn<String, Void> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws IOException {
-      MatchResult match = Iterables
-          .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
-      FileSystems.delete(toResourceIds(match));
-    }
-
-    private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException {
-      return FluentIterable.from(match.metadata())
-          .transform(new Function<MatchResult.Metadata, ResourceId>() {
-
-            @Override
-            public ResourceId apply(MatchResult.Metadata metadata) {
-              return metadata.resourceId();
-            }
-          }).toList();
-    }
-  }
 }
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
new file mode 100644
index 0000000..4589942
--- /dev/null
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.tfrecord;
+
+import static org.apache.beam.sdk.io.Compression.AUTO;
+
+import java.text.ParseException;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link org.apache.beam.sdk.io.TFRecordIO}.
+ *
+ * <p>Run this test using the command below. Pass in connection information via PipelineOptions:
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests
+ *  -Dit.test=org.apache.beam.sdk.io.tfrecord.TFRecordIOIT
+ *  -DintegrationTestPipelineOptions='[
+ *  "--numberOfRecords=100000",
+ *  "--filenamePrefix=FILEBASEDIOIT"
+ *  "--compressionType=GZIP"
+ *  ]'
+ * </pre>
+ * </p>
+ * <p>Please {@see 'sdks/java/io/file-based-io-tests/pom.xml'} for instructions regarding
+ * running this test using Beam performance testing framework.</p>
+ */
+@RunWith(JUnit4.class)
+public class TFRecordIOIT extends AbstractFileBasedIOIT {
+
+  private static String filenamePrefix;
+  private static Long numberOfTextLines;
+  private static Compression compressionType;
+
+  @Rule
+  public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule
+  public TestPipeline readPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() throws ParseException {
+    IOTestPipelineOptions options = readTestPipelineOptions();
+
+    numberOfTextLines = options.getNumberOfRecords();
+    filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
+    compressionType = parseCompressionType(options.getCompressionType());
+  }
+
+  private static String createFilenamePattern() {
+    return filenamePrefix + "*";
+  }
+
+  // TODO: There are two pipelines due to: https://issues.apache.org/jira/browse/BEAM-3267
+  @Test
+  public void writeThenReadAll() {
+    TFRecordIO.Write writeTransform = TFRecordIO
+        .write()
+        .to(filenamePrefix)
+        .withCompression(compressionType)
+        .withSuffix(".tfrecord");
+
+    writePipeline
+        .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines))
+        .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn()))
+        .apply("Transform strings to bytes", MapElements.via(new StringToByteArray()))
+        .apply("Write content to files", writeTransform);
+
+    writePipeline.run().waitUntilFinish();
+
+    String filenamePattern = createFilenamePattern();
+    PCollection<String> consolidatedHashcode = readPipeline
+        .apply(TFRecordIO.read().from(filenamePattern).withCompression(AUTO))
+        .apply("Transform bytes to strings", MapElements.via(new ByteArrayToString()))
+        .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+    String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
+    PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+    readPipeline.apply(Create.of(filenamePattern))
+        .apply("Delete test files", ParDo.of(new DeleteFileFn())
+        .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
+    readPipeline.run().waitUntilFinish();
+  }
+
+  static class StringToByteArray extends SimpleFunction<String, byte[]> {
+    @Override
+    public byte[] apply(String input) {
+      return input.getBytes();
+    }
+  }
+
+  static class ByteArrayToString extends SimpleFunction<byte[], String> {
+    @Override
+    public String apply(byte[] input) {
+      return new String(input);
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 02/02: add post-review updates

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7564d822aa28a79f27bd0efa09a213ffd3635708
Author: Łukasz Gajowy <lu...@polidea.com>
AuthorDate: Thu Nov 30 15:59:59 2017 -0800

    add post-review updates
---
 .../beam/sdk/io/common/IOTestPipelineOptions.java  |  3 +-
 ...FileBasedIOIT.java => FileBasedIOITHelper.java} | 50 +++++++++-------------
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java | 16 ++++---
 .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java  | 16 ++++---
 4 files changed, 43 insertions(+), 42 deletions(-)

diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index d919654..e7b475d 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.common;
 
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 
 /**
@@ -96,7 +97,7 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
   void setNumberOfRecords(Long count);
 
   @Description("Destination prefix for files generated by the test")
-  @Default.String("FILEBASEDIOIT")
+  @Validation.Required
   String getFilenamePrefix();
 
   void setFilenamePrefix(String prefix);
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
similarity index 66%
rename from sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
rename to sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index 9eb8aea..cf20d8e 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -18,47 +18,44 @@
 
 package org.apache.beam.sdk.io.common;
 
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.Map;
-import org.apache.beam.sdk.io.Compression;
+import java.util.Set;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
- * Abstract class for file based IO Integration tests.
+ * Contains helper methods for file based IO Integration tests.
  */
-public abstract class AbstractFileBasedIOIT {
+public class FileBasedIOITHelper {
 
-  protected static IOTestPipelineOptions readTestPipelineOptions() {
-    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
-    return TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+  private FileBasedIOITHelper() {
   }
 
-  protected static String appendTimestampToPrefix(String filenamePrefix) {
-    return String.format("%s_%s", filenamePrefix, new Date().getTime());
+  public static IOTestPipelineOptions readTestPipelineOptions() {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options = TestPipeline
+        .testingPipelineOptions()
+        .as(IOTestPipelineOptions.class);
+
+    return PipelineOptionsValidator.validate(IOTestPipelineOptions.class, options);
   }
 
-  protected static Compression parseCompressionType(String compressionType) {
-    try {
-      return Compression.valueOf(compressionType.toUpperCase());
-    } catch (IllegalArgumentException ex) {
-      throw new IllegalArgumentException(
-          String.format("Unsupported compression type: %s", compressionType));
-    }
+  public static String appendTimestampToPrefix(String filenamePrefix) {
+    return String.format("%s_%s", filenamePrefix, new Date().getTime());
   }
 
-  protected String getExpectedHashForLineCount(Long lineCount) {
+  public static String getExpectedHashForLineCount(Long lineCount) {
     Map<Long, String> expectedHashes = ImmutableMap.of(
         100_000L, "4c8bb3b99dcc59459b20fefba400d446",
         1_000_000L, "9796db06e7a7960f974d5a91164afff1",
@@ -78,6 +75,7 @@ public abstract class AbstractFileBasedIOIT {
    * Constructs text lines in files used for testing.
    */
   public static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> {
+
     @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(String.format("IO IT Test line of text. Line seed: %s", c.element()));
@@ -94,18 +92,12 @@ public abstract class AbstractFileBasedIOIT {
       MatchResult match = Iterables
           .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
 
-      Collection<ResourceId> resourceIds = toResourceIds(match);
+      Set<ResourceId> resourceIds = new HashSet<>();
+      for (MatchResult.Metadata metadataElem : match.metadata()) {
+        resourceIds.add(metadataElem.resourceId());
+      }
 
       FileSystems.delete(resourceIds);
     }
-    private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException {
-      return FluentIterable.from(match.metadata())
-          .transform(new Function<MatchResult.Metadata, ResourceId>() {
-            @Override
-            public ResourceId apply(MatchResult.Metadata metadata) {
-              return metadata.resourceId();
-            }
-          }).toList();
-    }
   }
 }
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index 7593f85..f9fad80 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -19,12 +19,15 @@
 package org.apache.beam.sdk.io.text;
 
 import static org.apache.beam.sdk.io.Compression.AUTO;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
 
 import java.text.ParseException;
 import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
@@ -49,7 +52,7 @@ import org.junit.runners.JUnit4;
  *  -Dit.test=org.apache.beam.sdk.io.text.TextIOIT
  *  -DintegrationTestPipelineOptions='[
  *  "--numberOfRecords=100000",
- *  "--filenamePrefix=FILEBASEDIOIT"
+ *  "--filenamePrefix=output_file_path",
  *  "--compressionType=GZIP"
  *  ]'
  * </pre>
@@ -58,7 +61,7 @@ import org.junit.runners.JUnit4;
  * running this test using Beam performance testing framework.</p>
  */
 @RunWith(JUnit4.class)
-public class TextIOIT extends AbstractFileBasedIOIT {
+public class TextIOIT {
 
   private static String filenamePrefix;
   private static Long numberOfTextLines;
@@ -73,7 +76,7 @@ public class TextIOIT extends AbstractFileBasedIOIT {
 
     numberOfTextLines = options.getNumberOfRecords();
     filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
-    compressionType = parseCompressionType(options.getCompressionType());
+    compressionType = Compression.valueOf(options.getCompressionType());
   }
 
   @Test
@@ -86,7 +89,8 @@ public class TextIOIT extends AbstractFileBasedIOIT {
 
     PCollection<String> testFilenames = pipeline
         .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines))
-        .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn()))
+        .apply("Produce text lines",
+            ParDo.of(new FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
         .apply("Write content to files", write)
         .getPerDestinationOutputFilenames().apply(Values.<String>create());
 
@@ -97,7 +101,7 @@ public class TextIOIT extends AbstractFileBasedIOIT {
     String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
     PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
 
-    testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn())
+    testFilenames.apply("Delete test files", ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
         .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
 
     pipeline.run().waitUntilFinish();
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 4589942..b887316 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -19,12 +19,15 @@
 package org.apache.beam.sdk.io.tfrecord;
 
 import static org.apache.beam.sdk.io.Compression.AUTO;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
 
 import java.text.ParseException;
 import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TFRecordIO;
-import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
@@ -51,7 +54,7 @@ import org.junit.runners.JUnit4;
  *  -Dit.test=org.apache.beam.sdk.io.tfrecord.TFRecordIOIT
  *  -DintegrationTestPipelineOptions='[
  *  "--numberOfRecords=100000",
- *  "--filenamePrefix=FILEBASEDIOIT"
+ *  "--filenamePrefix=output_file_path",
  *  "--compressionType=GZIP"
  *  ]'
  * </pre>
@@ -60,7 +63,7 @@ import org.junit.runners.JUnit4;
  * running this test using Beam performance testing framework.</p>
  */
 @RunWith(JUnit4.class)
-public class TFRecordIOIT extends AbstractFileBasedIOIT {
+public class TFRecordIOIT {
 
   private static String filenamePrefix;
   private static Long numberOfTextLines;
@@ -78,7 +81,7 @@ public class TFRecordIOIT extends AbstractFileBasedIOIT {
 
     numberOfTextLines = options.getNumberOfRecords();
     filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
-    compressionType = parseCompressionType(options.getCompressionType());
+    compressionType = Compression.valueOf(options.getCompressionType());
   }
 
   private static String createFilenamePattern() {
@@ -96,7 +99,8 @@ public class TFRecordIOIT extends AbstractFileBasedIOIT {
 
     writePipeline
         .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines))
-        .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn()))
+        .apply("Produce text lines",
+            ParDo.of(new FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
         .apply("Transform strings to bytes", MapElements.via(new StringToByteArray()))
         .apply("Write content to files", writeTransform);
 
@@ -112,7 +116,7 @@ public class TFRecordIOIT extends AbstractFileBasedIOIT {
     PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
 
     readPipeline.apply(Create.of(filenamePattern))
-        .apply("Delete test files", ParDo.of(new DeleteFileFn())
+        .apply("Delete test files", ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
         .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
     readPipeline.run().waitUntilFinish();
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.