You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 22:10:12 UTC
[beam] 01/02: [BEAM-3060] add TFRecordIOIT
This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1a6c25c7c547de882b7ec4da80a5ace1d01f397d
Author: Ćukasz Gajowy <lu...@polidea.com>
AuthorDate: Tue Nov 28 13:23:28 2017 -0800
[BEAM-3060] add TFRecordIOIT
---
.../beam/sdk/io/common/IOTestPipelineOptions.java | 2 +-
.../beam/sdk/io/common/AbstractFileBasedIOIT.java | 111 +++++++++++++++++
.../java/org/apache/beam/sdk/io/text/TextIOIT.java | 86 +------------
.../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java | 133 +++++++++++++++++++++
4 files changed, 251 insertions(+), 81 deletions(-)
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 5a29d4f..d919654 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -96,7 +96,7 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
void setNumberOfRecords(Long count);
@Description("Destination prefix for files generated by the test")
- @Default.String("TEXTIOIT")
+ @Default.String("FILEBASEDIOIT")
String getFilenamePrefix();
void setFilenamePrefix(String prefix);
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
new file mode 100644
index 0000000..9eb8aea
--- /dev/null
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.common;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Abstract class for file based IO Integration tests.
+ */
+public abstract class AbstractFileBasedIOIT {
+
+ protected static IOTestPipelineOptions readTestPipelineOptions() {
+ PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+ return TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+ }
+
+ protected static String appendTimestampToPrefix(String filenamePrefix) {
+ return String.format("%s_%s", filenamePrefix, new Date().getTime());
+ }
+
+ protected static Compression parseCompressionType(String compressionType) {
+ try {
+ return Compression.valueOf(compressionType.toUpperCase());
+ } catch (IllegalArgumentException ex) {
+ throw new IllegalArgumentException(
+ String.format("Unsupported compression type: %s", compressionType));
+ }
+ }
+
+ protected String getExpectedHashForLineCount(Long lineCount) {
+ Map<Long, String> expectedHashes = ImmutableMap.of(
+ 100_000L, "4c8bb3b99dcc59459b20fefba400d446",
+ 1_000_000L, "9796db06e7a7960f974d5a91164afff1",
+ 100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
+ );
+
+ String hash = expectedHashes.get(lineCount);
+ if (hash == null) {
+ throw new UnsupportedOperationException(
+ String.format("No hash for that line count: %s", lineCount)
+ );
+ }
+ return hash;
+ }
+
+ /**
+ * Constructs text lines in files used for testing.
+ */
+ public static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(String.format("IO IT Test line of text. Line seed: %s", c.element()));
+ }
+ }
+
+ /**
+ * Deletes matching files using the FileSystems API.
+ */
+ public static class DeleteFileFn extends DoFn<String, Void> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ MatchResult match = Iterables
+ .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
+
+ Collection<ResourceId> resourceIds = toResourceIds(match);
+
+ FileSystems.delete(resourceIds);
+ }
+ private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException {
+ return FluentIterable.from(match.metadata())
+ .transform(new Function<MatchResult.Metadata, ResourceId>() {
+ @Override
+ public ResourceId apply(MatchResult.Metadata metadata) {
+ return metadata.resourceId();
+ }
+ }).toList();
+ }
+ }
+}
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index e9aac80..7593f85 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -20,31 +20,16 @@ package org.apache.beam.sdk.io.text;
import static org.apache.beam.sdk.io.Compression.AUTO;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-
-import java.io.IOException;
import java.text.ParseException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Map;
-
import org.apache.beam.sdk.io.Compression;
-import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
@@ -64,16 +49,16 @@ import org.junit.runners.JUnit4;
* -Dit.test=org.apache.beam.sdk.io.text.TextIOIT
* -DintegrationTestPipelineOptions='[
* "--numberOfRecords=100000",
- * "--filenamePrefix=TEXTIOIT"
+ * "--filenamePrefix=FILEBASEDIOIT"
* "--compressionType=GZIP"
* ]'
* </pre>
* </p>
* <p>Please see 'sdks/java/io/file-based-io-tests/pom.xml' for instructions regarding
* running this test using Beam performance testing framework.</p>
- * */
+ */
@RunWith(JUnit4.class)
-public class TextIOIT {
+public class TextIOIT extends AbstractFileBasedIOIT {
private static String filenamePrefix;
private static Long numberOfTextLines;
@@ -84,28 +69,13 @@ public class TextIOIT {
@BeforeClass
public static void setup() throws ParseException {
- PipelineOptionsFactory.register(IOTestPipelineOptions.class);
- IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
- .as(IOTestPipelineOptions.class);
+ IOTestPipelineOptions options = readTestPipelineOptions();
numberOfTextLines = options.getNumberOfRecords();
- filenamePrefix = appendTimestamp(options.getFilenamePrefix());
+ filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
compressionType = parseCompressionType(options.getCompressionType());
}
- private static Compression parseCompressionType(String compressionType) {
- try {
- return Compression.valueOf(compressionType.toUpperCase());
- } catch (IllegalArgumentException ex) {
- throw new IllegalArgumentException(
- String.format("Unsupported compression type: %s", compressionType));
- }
- }
-
- private static String appendTimestamp(String filenamePrefix) {
- return String.format("%s_%s", filenamePrefix, new Date().getTime());
- }
-
@Test
public void writeThenReadAll() {
TextIO.TypedWrite<String, Object> write = TextIO
@@ -132,48 +102,4 @@ public class TextIOIT {
pipeline.run().waitUntilFinish();
}
-
- private static String getExpectedHashForLineCount(Long lineCount) {
- Map<Long, String> expectedHashes = ImmutableMap.of(
- 100_000L, "4c8bb3b99dcc59459b20fefba400d446",
- 1_000_000L, "9796db06e7a7960f974d5a91164afff1",
- 100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
- );
-
- String hash = expectedHashes.get(lineCount);
- if (hash == null) {
- throw new UnsupportedOperationException(
- String.format("No hash for that line count: %s", lineCount));
- }
- return hash;
- }
-
- private static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> {
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(String.format("IO IT Test line of text. Line seed: %s", c.element()));
- }
- }
-
- private static class DeleteFileFn extends DoFn<String, Void> {
-
- @ProcessElement
- public void processElement(ProcessContext c) throws IOException {
- MatchResult match = Iterables
- .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
- FileSystems.delete(toResourceIds(match));
- }
-
- private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException {
- return FluentIterable.from(match.metadata())
- .transform(new Function<MatchResult.Metadata, ResourceId>() {
-
- @Override
- public ResourceId apply(MatchResult.Metadata metadata) {
- return metadata.resourceId();
- }
- }).toList();
- }
- }
}
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
new file mode 100644
index 0000000..4589942
--- /dev/null
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.tfrecord;
+
+import static org.apache.beam.sdk.io.Compression.AUTO;
+
+import java.text.ParseException;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link org.apache.beam.sdk.io.TFRecordIO}.
+ *
+ * <p>Run this test using the command below. Pass in connection information via PipelineOptions:
+ * <pre>
+ * mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests
+ * -Dit.test=org.apache.beam.sdk.io.tfrecord.TFRecordIOIT
+ * -DintegrationTestPipelineOptions='[
+ * "--numberOfRecords=100000",
+ * "--filenamePrefix=FILEBASEDIOIT"
+ * "--compressionType=GZIP"
+ * ]'
+ * </pre>
+ * </p>
+ * <p>Please {@see 'sdks/java/io/file-based-io-tests/pom.xml'} for instructions regarding
+ * running this test using Beam performance testing framework.</p>
+ */
+@RunWith(JUnit4.class)
+public class TFRecordIOIT extends AbstractFileBasedIOIT {
+
+ private static String filenamePrefix;
+ private static Long numberOfTextLines;
+ private static Compression compressionType;
+
+ @Rule
+ public TestPipeline writePipeline = TestPipeline.create();
+
+ @Rule
+ public TestPipeline readPipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void setup() throws ParseException {
+ IOTestPipelineOptions options = readTestPipelineOptions();
+
+ numberOfTextLines = options.getNumberOfRecords();
+ filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
+ compressionType = parseCompressionType(options.getCompressionType());
+ }
+
+ private static String createFilenamePattern() {
+ return filenamePrefix + "*";
+ }
+
+ // TODO: There are two pipelines due to: https://issues.apache.org/jira/browse/BEAM-3267
+ @Test
+ public void writeThenReadAll() {
+ TFRecordIO.Write writeTransform = TFRecordIO
+ .write()
+ .to(filenamePrefix)
+ .withCompression(compressionType)
+ .withSuffix(".tfrecord");
+
+ writePipeline
+ .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines))
+ .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn()))
+ .apply("Transform strings to bytes", MapElements.via(new StringToByteArray()))
+ .apply("Write content to files", writeTransform);
+
+ writePipeline.run().waitUntilFinish();
+
+ String filenamePattern = createFilenamePattern();
+ PCollection<String> consolidatedHashcode = readPipeline
+ .apply(TFRecordIO.read().from(filenamePattern).withCompression(AUTO))
+ .apply("Transform bytes to strings", MapElements.via(new ByteArrayToString()))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+ String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
+ PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+ readPipeline.apply(Create.of(filenamePattern))
+ .apply("Delete test files", ParDo.of(new DeleteFileFn())
+ .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
+ readPipeline.run().waitUntilFinish();
+ }
+
+ static class StringToByteArray extends SimpleFunction<String, byte[]> {
+ @Override
+ public byte[] apply(String input) {
+ return input.getBytes();
+ }
+ }
+
+ static class ByteArrayToString extends SimpleFunction<byte[], String> {
+ @Override
+ public String apply(byte[] input) {
+ return new String(input);
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.