You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 22:10:12 UTC

[beam] 01/02: [BEAM-3060] add TFRecordIOIT

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1a6c25c7c547de882b7ec4da80a5ace1d01f397d
Author: Ɓukasz Gajowy <lu...@polidea.com>
AuthorDate: Tue Nov 28 13:23:28 2017 -0800

    [BEAM-3060] add TFRecordIOIT
---
 .../beam/sdk/io/common/IOTestPipelineOptions.java  |   2 +-
 .../beam/sdk/io/common/AbstractFileBasedIOIT.java  | 111 +++++++++++++++++
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java |  86 +------------
 .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java  | 133 +++++++++++++++++++++
 4 files changed, 251 insertions(+), 81 deletions(-)

diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 5a29d4f..d919654 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -96,7 +96,7 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
   void setNumberOfRecords(Long count);
 
   @Description("Destination prefix for files generated by the test")
-  @Default.String("TEXTIOIT")
+  @Default.String("FILEBASEDIOIT")
   String getFilenamePrefix();
 
   void setFilenamePrefix(String prefix);
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
new file mode 100644
index 0000000..9eb8aea
--- /dev/null
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.common;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Abstract class for file based IO Integration tests.
+ */
+public abstract class AbstractFileBasedIOIT {
+
+  protected static IOTestPipelineOptions readTestPipelineOptions() {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    return TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+  }
+
+  protected static String appendTimestampToPrefix(String filenamePrefix) {
+    return String.format("%s_%s", filenamePrefix, new Date().getTime());
+  }
+
+  protected static Compression parseCompressionType(String compressionType) {
+    try {
+      return Compression.valueOf(compressionType.toUpperCase());
+    } catch (IllegalArgumentException ex) {
+      throw new IllegalArgumentException(
+          String.format("Unsupported compression type: %s", compressionType));
+    }
+  }
+
+  protected String getExpectedHashForLineCount(Long lineCount) {
+    Map<Long, String> expectedHashes = ImmutableMap.of(
+        100_000L, "4c8bb3b99dcc59459b20fefba400d446",
+        1_000_000L, "9796db06e7a7960f974d5a91164afff1",
+        100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
+    );
+
+    String hash = expectedHashes.get(lineCount);
+    if (hash == null) {
+      throw new UnsupportedOperationException(
+          String.format("No hash for that line count: %s", lineCount)
+      );
+    }
+    return hash;
+  }
+
+  /**
+   * Constructs text lines in files used for testing.
+   */
+  public static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(String.format("IO IT Test line of text. Line seed: %s", c.element()));
+    }
+  }
+
+  /**
+   * Deletes matching files using the FileSystems API.
+   */
+  public static class DeleteFileFn extends DoFn<String, Void> {
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      MatchResult match = Iterables
+          .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
+
+      Collection<ResourceId> resourceIds = toResourceIds(match);
+
+      FileSystems.delete(resourceIds);
+    }
+    private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException {
+      return FluentIterable.from(match.metadata())
+          .transform(new Function<MatchResult.Metadata, ResourceId>() {
+            @Override
+            public ResourceId apply(MatchResult.Metadata metadata) {
+              return metadata.resourceId();
+            }
+          }).toList();
+    }
+  }
+}
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index e9aac80..7593f85 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -20,31 +20,16 @@ package org.apache.beam.sdk.io.text;
 
 import static org.apache.beam.sdk.io.Compression.AUTO;
 
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-
-import java.io.IOException;
 import java.text.ParseException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Map;
-
 import org.apache.beam.sdk.io.Compression;
-import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
@@ -64,16 +49,16 @@ import org.junit.runners.JUnit4;
  *  -Dit.test=org.apache.beam.sdk.io.text.TextIOIT
  *  -DintegrationTestPipelineOptions='[
  *  "--numberOfRecords=100000",
- *  "--filenamePrefix=TEXTIOIT"
+ *  "--filenamePrefix=FILEBASEDIOIT"
  *  "--compressionType=GZIP"
  *  ]'
  * </pre>
  * </p>
  * <p>Please see 'sdks/java/io/file-based-io-tests/pom.xml' for instructions regarding
  * running this test using Beam performance testing framework.</p>
- * */
+ */
 @RunWith(JUnit4.class)
-public class TextIOIT {
+public class TextIOIT extends AbstractFileBasedIOIT {
 
   private static String filenamePrefix;
   private static Long numberOfTextLines;
@@ -84,28 +69,13 @@ public class TextIOIT {
 
   @BeforeClass
   public static void setup() throws ParseException {
-    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
-    IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
-        .as(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options = readTestPipelineOptions();
 
     numberOfTextLines = options.getNumberOfRecords();
-    filenamePrefix = appendTimestamp(options.getFilenamePrefix());
+    filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
     compressionType = parseCompressionType(options.getCompressionType());
   }
 
-  private static Compression parseCompressionType(String compressionType) {
-    try {
-      return Compression.valueOf(compressionType.toUpperCase());
-    } catch (IllegalArgumentException ex) {
-      throw new IllegalArgumentException(
-          String.format("Unsupported compression type: %s", compressionType));
-    }
-  }
-
-  private static String appendTimestamp(String filenamePrefix) {
-    return String.format("%s_%s", filenamePrefix, new Date().getTime());
-  }
-
   @Test
   public void writeThenReadAll() {
     TextIO.TypedWrite<String, Object> write = TextIO
@@ -132,48 +102,4 @@ public class TextIOIT {
 
     pipeline.run().waitUntilFinish();
   }
-
-  private static String getExpectedHashForLineCount(Long lineCount) {
-    Map<Long, String> expectedHashes = ImmutableMap.of(
-        100_000L, "4c8bb3b99dcc59459b20fefba400d446",
-        1_000_000L, "9796db06e7a7960f974d5a91164afff1",
-        100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
-    );
-
-    String hash = expectedHashes.get(lineCount);
-    if (hash == null) {
-      throw new UnsupportedOperationException(
-          String.format("No hash for that line count: %s", lineCount));
-    }
-    return hash;
-  }
-
-  private static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(String.format("IO IT Test line of text. Line seed: %s", c.element()));
-    }
-  }
-
-  private static class DeleteFileFn extends DoFn<String, Void> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws IOException {
-      MatchResult match = Iterables
-          .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
-      FileSystems.delete(toResourceIds(match));
-    }
-
-    private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException {
-      return FluentIterable.from(match.metadata())
-          .transform(new Function<MatchResult.Metadata, ResourceId>() {
-
-            @Override
-            public ResourceId apply(MatchResult.Metadata metadata) {
-              return metadata.resourceId();
-            }
-          }).toList();
-    }
-  }
 }
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
new file mode 100644
index 0000000..4589942
--- /dev/null
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.tfrecord;
+
+import static org.apache.beam.sdk.io.Compression.AUTO;
+
+import java.text.ParseException;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link org.apache.beam.sdk.io.TFRecordIO}.
+ *
+ * <p>Run this test using the command below. Pass in connection information via PipelineOptions:
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests
+ *  -Dit.test=org.apache.beam.sdk.io.tfrecord.TFRecordIOIT
+ *  -DintegrationTestPipelineOptions='[
+ *  "--numberOfRecords=100000",
+ *  "--filenamePrefix=FILEBASEDIOIT"
+ *  "--compressionType=GZIP"
+ *  ]'
+ * </pre>
+ * </p>
+ * <p>Please {@see 'sdks/java/io/file-based-io-tests/pom.xml'} for instructions regarding
+ * running this test using Beam performance testing framework.</p>
+ */
+@RunWith(JUnit4.class)
+public class TFRecordIOIT extends AbstractFileBasedIOIT {
+
+  private static String filenamePrefix;
+  private static Long numberOfTextLines;
+  private static Compression compressionType;
+
+  @Rule
+  public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule
+  public TestPipeline readPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() throws ParseException {
+    IOTestPipelineOptions options = readTestPipelineOptions();
+
+    numberOfTextLines = options.getNumberOfRecords();
+    filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
+    compressionType = parseCompressionType(options.getCompressionType());
+  }
+
+  private static String createFilenamePattern() {
+    return filenamePrefix + "*";
+  }
+
+  // TODO: There are two pipelines due to: https://issues.apache.org/jira/browse/BEAM-3267
+  @Test
+  public void writeThenReadAll() {
+    TFRecordIO.Write writeTransform = TFRecordIO
+        .write()
+        .to(filenamePrefix)
+        .withCompression(compressionType)
+        .withSuffix(".tfrecord");
+
+    writePipeline
+        .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines))
+        .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn()))
+        .apply("Transform strings to bytes", MapElements.via(new StringToByteArray()))
+        .apply("Write content to files", writeTransform);
+
+    writePipeline.run().waitUntilFinish();
+
+    String filenamePattern = createFilenamePattern();
+    PCollection<String> consolidatedHashcode = readPipeline
+        .apply(TFRecordIO.read().from(filenamePattern).withCompression(AUTO))
+        .apply("Transform bytes to strings", MapElements.via(new ByteArrayToString()))
+        .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+    String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
+    PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+    readPipeline.apply(Create.of(filenamePattern))
+        .apply("Delete test files", ParDo.of(new DeleteFileFn())
+        .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
+    readPipeline.run().waitUntilFinish();
+  }
+
+  static class StringToByteArray extends SimpleFunction<String, byte[]> {
+    @Override
+    public byte[] apply(String input) {
+      return input.getBytes();
+    }
+  }
+
+  static class ByteArrayToString extends SimpleFunction<byte[], String> {
+    @Override
+    public String apply(byte[] input) {
+      return new String(input);
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.