You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/18 23:24:33 UTC

[1/2] incubator-beam git commit: [BEAM-124] WordCountIT: Add outputs verification

Repository: incubator-beam
Updated Branches:
  refs/heads/master 11c2ec2b1 -> 52331d96c


[BEAM-124] WordCountIT: Add outputs verification


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e09eefdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e09eefdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e09eefdd

Branch: refs/heads/master
Commit: e09eefddfb671f3d41b31152a683ae9d92a0e193
Parents: 11c2ec2
Author: Mark Liu <ma...@markliu0.mtv.corp.google.com>
Authored: Fri Jul 8 10:50:26 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jul 18 16:23:32 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCountIT.java   | 118 ++++++++++++++++++-
 .../apache/beam/sdk/testing/TestPipeline.java   |   3 +
 2 files changed, 118 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e09eefdd/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index 503445e..6ba46d7 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -18,17 +18,38 @@
 
 package org.apache.beam.examples;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.beam.examples.WordCount.WordCountOptions;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SerializableMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
-import com.google.common.base.Joiner;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+
+import com.google.common.base.Strings;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import com.google.common.io.CharStreams;
 
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
+import java.util.List;
 
 /**
  * End-to-end tests of WordCount.
@@ -46,9 +67,100 @@ public class WordCountIT {
   public void testE2EWordCount() throws Exception {
     PipelineOptionsFactory.register(WordCountITOptions.class);
     WordCountITOptions options = TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
-    options.setOutput(Joiner.on("/").join(new String[]{options.getTempRoot(),
-        String.format("WordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), "output", "results"}));
+
+    options.setOutput(IOChannelUtils.resolve(
+        options.getTempRoot(),
+        String.format("WordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
+        "output",
+        "results"));
+    options.setOnSuccessMatcher(new WordCountOnSuccessMatcher(options.getOutput() + "*"));
 
     WordCount.main(TestPipeline.convertToArgs(options));
   }
+
+  /**
+   * Matcher for verifying WordCount output data.
+   */
+  static class WordCountOnSuccessMatcher extends TypeSafeMatcher<PipelineResult>
+      implements SerializableMatcher<PipelineResult> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WordCountOnSuccessMatcher.class);
+
+    private static final String EXPECTED_CHECKSUM = "8ae94f799f97cfd1cb5e8125951b32dfb52e1f12";
+    private String actualChecksum;
+
+    private final String outputPath;
+
+    WordCountOnSuccessMatcher(String outputPath) {
+      checkArgument(
+          !Strings.isNullOrEmpty(outputPath),
+          "Expected valid output path, but received %s", outputPath);
+
+      this.outputPath = outputPath;
+    }
+
+    @Override
+    protected boolean matchesSafely(PipelineResult pResult) {
+      try {
+        // Load output data
+        List<String> outputs = readLines(outputPath);
+
+        // Verify outputs. Checksum is computed using SHA-1 algorithm
+        actualChecksum = hashing(outputs);
+        LOG.info("Generated checksum for output data: {}", actualChecksum);
+
+        return actualChecksum.equals(EXPECTED_CHECKSUM);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            String.format("Failed to read from path: %s", outputPath));
+      }
+    }
+
+    private List<String> readLines(String path) throws IOException {
+      List<String> readData = new ArrayList<>();
+
+      IOChannelFactory factory = IOChannelUtils.getFactory(path);
+
+      // Match inputPath which may contains glob
+      Collection<String> files = factory.match(path);
+
+      // Read data from file paths
+      int i = 0;
+      for (String file : files) {
+        try (Reader reader =
+              Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
+          List<String> lines = CharStreams.readLines(reader);
+          readData.addAll(lines);
+          LOG.info(
+              "[{} of {}] Read {} lines from file: {}", i, files.size() - 1, lines.size(), file);
+        }
+        i++;
+      }
+      return readData;
+    }
+
+    private String hashing(List<String> strs) {
+      List<HashCode> hashCodes = new ArrayList<>();
+      for (String str : strs) {
+        hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
+      }
+      return Hashing.combineUnordered(hashCodes).toString();
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description
+          .appendText("Expected checksum is (")
+          .appendText(EXPECTED_CHECKSUM)
+          .appendText(")");
+    }
+
+    @Override
+    protected void describeMismatchSafely(PipelineResult pResult, Description description) {
+      description
+          .appendText("was (")
+          .appendText(actualChecksum)
+          .appendText(")");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e09eefdd/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index b901268..0de3024 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.TestCredential;
 
 import com.google.common.base.Optional;
@@ -155,6 +156,8 @@ public class TestPipeline extends Pipeline {
         options.as(GcpOptions.class).setGcpCredential(new TestCredential());
       }
       options.setStableUniqueNames(CheckEnabled.ERROR);
+
+      IOChannelUtils.registerStandardIOFactories(options);
       return options;
     } catch (IOException e) {
       throw new RuntimeException("Unable to instantiate test options from system property "


[2/2] incubator-beam git commit: [BEAM-124] WordCountIT: Add outputs verification

Posted by lc...@apache.org.
[BEAM-124] WordCountIT: Add outputs verification

This closes #611


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52331d96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52331d96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52331d96

Branch: refs/heads/master
Commit: 52331d96cbf94c4828638977b2a148061e0664c4
Parents: 11c2ec2 e09eefd
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jul 18 16:24:24 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jul 18 16:24:24 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCountIT.java   | 118 ++++++++++++++++++-
 .../apache/beam/sdk/testing/TestPipeline.java   |   3 +
 2 files changed, 118 insertions(+), 3 deletions(-)
----------------------------------------------------------------------