You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/18 23:24:33 UTC
[1/2] incubator-beam git commit: [BEAM-124] WordCountIT: Add outputs
verification
Repository: incubator-beam
Updated Branches:
refs/heads/master 11c2ec2b1 -> 52331d96c
[BEAM-124] WordCountIT: Add outputs verification
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e09eefdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e09eefdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e09eefdd
Branch: refs/heads/master
Commit: e09eefddfb671f3d41b31152a683ae9d92a0e193
Parents: 11c2ec2
Author: Mark Liu <ma...@markliu0.mtv.corp.google.com>
Authored: Fri Jul 8 10:50:26 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jul 18 16:23:32 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCountIT.java | 118 ++++++++++++++++++-
.../apache/beam/sdk/testing/TestPipeline.java | 3 +
2 files changed, 118 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e09eefdd/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index 503445e..6ba46d7 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -18,17 +18,38 @@
package org.apache.beam.examples;
+import static com.google.common.base.Preconditions.checkArgument;
+
import org.apache.beam.examples.WordCount.WordCountOptions;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
-import com.google.common.base.Joiner;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+
+import com.google.common.base.Strings;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import com.google.common.io.CharStreams;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
+import java.util.List;
/**
* End-to-end tests of WordCount.
@@ -46,9 +67,100 @@ public class WordCountIT {
public void testE2EWordCount() throws Exception {
PipelineOptionsFactory.register(WordCountITOptions.class);
WordCountITOptions options = TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
- options.setOutput(Joiner.on("/").join(new String[]{options.getTempRoot(),
- String.format("WordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), "output", "results"}));
+
+ options.setOutput(IOChannelUtils.resolve(
+ options.getTempRoot(),
+ String.format("WordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
+ "output",
+ "results"));
+ options.setOnSuccessMatcher(new WordCountOnSuccessMatcher(options.getOutput() + "*"));
WordCount.main(TestPipeline.convertToArgs(options));
}
+
+ /**
+ * Matcher for verifying WordCount output data.
+ */
+ static class WordCountOnSuccessMatcher extends TypeSafeMatcher<PipelineResult>
+ implements SerializableMatcher<PipelineResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WordCountOnSuccessMatcher.class);
+
+ private static final String EXPECTED_CHECKSUM = "8ae94f799f97cfd1cb5e8125951b32dfb52e1f12";
+ private String actualChecksum;
+
+ private final String outputPath;
+
+ WordCountOnSuccessMatcher(String outputPath) {
+ checkArgument(
+ !Strings.isNullOrEmpty(outputPath),
+ "Expected valid output path, but received %s", outputPath);
+
+ this.outputPath = outputPath;
+ }
+
+ @Override
+ protected boolean matchesSafely(PipelineResult pResult) {
+ try {
+ // Load output data
+ List<String> outputs = readLines(outputPath);
+
+ // Verify outputs. Checksum is computed using SHA-1 algorithm
+ actualChecksum = hashing(outputs);
+ LOG.info("Generated checksum for output data: {}", actualChecksum);
+
+ return actualChecksum.equals(EXPECTED_CHECKSUM);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to read from path: %s", outputPath));
+ }
+ }
+
+ private List<String> readLines(String path) throws IOException {
+ List<String> readData = new ArrayList<>();
+
+ IOChannelFactory factory = IOChannelUtils.getFactory(path);
+
+ // Match inputPath which may contains glob
+ Collection<String> files = factory.match(path);
+
+ // Read data from file paths
+ int i = 0;
+ for (String file : files) {
+ try (Reader reader =
+ Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
+ List<String> lines = CharStreams.readLines(reader);
+ readData.addAll(lines);
+ LOG.info(
+ "[{} of {}] Read {} lines from file: {}", i, files.size() - 1, lines.size(), file);
+ }
+ i++;
+ }
+ return readData;
+ }
+
+ private String hashing(List<String> strs) {
+ List<HashCode> hashCodes = new ArrayList<>();
+ for (String str : strs) {
+ hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
+ }
+ return Hashing.combineUnordered(hashCodes).toString();
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("Expected checksum is (")
+ .appendText(EXPECTED_CHECKSUM)
+ .appendText(")");
+ }
+
+ @Override
+ protected void describeMismatchSafely(PipelineResult pResult, Description description) {
+ description
+ .appendText("was (")
+ .appendText(actualChecksum)
+ .appendText(")");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e09eefdd/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index b901268..0de3024 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.TestCredential;
import com.google.common.base.Optional;
@@ -155,6 +156,8 @@ public class TestPipeline extends Pipeline {
options.as(GcpOptions.class).setGcpCredential(new TestCredential());
}
options.setStableUniqueNames(CheckEnabled.ERROR);
+
+ IOChannelUtils.registerStandardIOFactories(options);
return options;
} catch (IOException e) {
throw new RuntimeException("Unable to instantiate test options from system property "
[2/2] incubator-beam git commit: [BEAM-124] WordCountIT: Add outputs
verification
Posted by lc...@apache.org.
[BEAM-124] WordCountIT: Add outputs verification
This closes #611
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52331d96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52331d96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52331d96
Branch: refs/heads/master
Commit: 52331d96cbf94c4828638977b2a148061e0664c4
Parents: 11c2ec2 e09eefd
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jul 18 16:24:24 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jul 18 16:24:24 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCountIT.java | 118 ++++++++++++++++++-
.../apache/beam/sdk/testing/TestPipeline.java | 3 +
2 files changed, 118 insertions(+), 3 deletions(-)
----------------------------------------------------------------------