You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:01 UTC

[10/50] [abbrv] beam git commit: [BEAM-3060] Adds TextIOIT for DirectRunner and local filesystem

[BEAM-3060] Adds TextIOIT for DirectRunner and local filesystem

This is one of multiple commits to resolve the 3060 issue. Currently only local filesystem,
relatively small datasets and DirectRunner are supported. More runners, filesystems
and larger dataset testing ability (of gigabytes size) will be added soon in further commits.

See: https://docs.google.com/document/d/1dA-5s6OHiP_cz-NRAbwapoKF5MEC1wKps4A5tFbIPKE/edit#


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb4b6d3a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb4b6d3a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb4b6d3a

Branch: refs/heads/tez-runner
Commit: fb4b6d3a6a6ece1d8d9f4103708fde5595c12ad4
Parents: 51c938d
Author: Ɓukasz Gajowy <lu...@polidea.com>
Authored: Tue Oct 31 10:25:22 2017 +0100
Committer: chamikara@google.com <ch...@google.com>
Committed: Fri Nov 10 17:15:07 2017 -0800

----------------------------------------------------------------------
 .../sdk/io/common/IOTestPipelineOptions.java    |  12 ++
 sdks/java/io/file-based-io-tests/pom.xml        |  72 +++++++++
 .../org/apache/beam/sdk/io/text/TextIOIT.java   | 146 +++++++++++++++++++
 sdks/java/io/pom.xml                            |   1 +
 4 files changed, 231 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fb4b6d3a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 256c94d..91b3aa6 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -88,4 +88,16 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
   Integer getCassandraPort();
   void setCassandraPort(Integer port);
 
+  /* Options for test pipeline for file-based I/O in 'sdks/java/io/file-based-io-tests/'. */
+  @Description("Number records that will be written and read by the test")
+  @Default.Long(100000)
+  Long getNumberOfRecords();
+
+  void setNumberOfRecords(Long count);
+
+  @Description("Destination prefix for files generated by the test")
+  @Default.String("TEXTIOIT")
+  String getFilenamePrefix();
+
+  void setFilenamePrefix(String prefix);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fb4b6d3a/sdks/java/io/file-based-io-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/file-based-io-tests/pom.xml b/sdks/java/io/file-based-io-tests/pom.xml
new file mode 100644
index 0000000..ae7527c
--- /dev/null
+++ b/sdks/java/io/file-based-io-tests/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-parent</artifactId>
+        <version>2.3.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>beam-sdks-java-io-file-based-io-tests</artifactId>
+    <name>Apache Beam :: SDKs :: Java :: IO :: File-based-io-tests</name>
+    <description>Integration tests for reading/writing using file-based sources/sinks.</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-runners-direct-java</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-common</artifactId>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-common</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/fb4b6d3a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
new file mode 100644
index 0000000..ecab1d8
--- /dev/null
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.text;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * An integration test for {@link org.apache.beam.sdk.io.TextIO}.
+ *
+ * <p>Run this test using the command below. Pass in connection information via PipelineOptions:
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/text -DintegrationTestPipelineOptions='[
+ *  "--numberOfRecords=100000",
+ *  "--filenamePrefix=TEXTIOIT"
+ *  ]'
+ * </pre>
+ * */
+@RunWith(JUnit4.class)
+public class TextIOIT {
+
+  private static String filenamePrefix;
+  private static Long numberOfTextLines;
+
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() throws ParseException {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
+        .as(IOTestPipelineOptions.class);
+
+    numberOfTextLines = options.getNumberOfRecords();
+    filenamePrefix = appendTimestamp(options.getFilenamePrefix());
+  }
+
+  private static String appendTimestamp(String filenamePrefix) {
+    return String.format("%s_%s", filenamePrefix, new Date().getTime());
+  }
+
+  @Test
+  public void writeThenReadAll() {
+    PCollection<String> testFilenames = pipeline
+        .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines))
+        .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn()))
+        .apply("Write content to files", TextIO.write().to(filenamePrefix).withOutputFilenames())
+        .getPerDestinationOutputFilenames().apply(Values.<String>create());
+
+    PCollection<String> consolidatedHashcode = testFilenames
+        .apply("Read all files", TextIO.readAll())
+        .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+    String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
+    PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+    testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn())
+        .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static String getExpectedHashForLineCount(Long lineCount) {
+    Map<Long, String> expectedHashes = ImmutableMap.of(
+        100_000L, "4c8bb3b99dcc59459b20fefba400d446",
+        1_000_000L, "9796db06e7a7960f974d5a91164afff1"
+    );
+
+    String hash = expectedHashes.get(lineCount);
+    if (hash == null) {
+      throw new UnsupportedOperationException(
+          String.format("No hash for that line count: %s", lineCount));
+    }
+    return hash;
+  }
+
+  private static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(String.format("IO IT Test line of text. Line seed: %s", c.element()));
+    }
+  }
+
+  private static class DeleteFileFn extends DoFn<String, Void> {
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      MatchResult match = Iterables
+          .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
+      FileSystems.delete(toResourceIds(match));
+    }
+
+    private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException {
+      return FluentIterable.from(match.metadata())
+          .transform(new Function<MatchResult.Metadata, ResourceId>() {
+            @Override
+            public ResourceId apply(MatchResult.Metadata metadata) {
+              return metadata.resourceId();
+            }
+          }).toList();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fb4b6d3a/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 99936a2..0f8bc78 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -45,6 +45,7 @@
     <module>common</module>
     <module>elasticsearch</module>
     <module>elasticsearch-tests</module>
+    <module>file-based-io-tests</module>
     <module>google-cloud-platform</module>
     <module>hadoop-common</module>
     <module>hadoop-file-system</module>