You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/11 21:28:54 UTC
[1/5] incubator-beam git commit: Using IOChannelUtils to resolve file
path
Repository: incubator-beam
Updated Branches:
refs/heads/master a0769ad2a -> ad449ffd5
Using IOChannelUtils to resolve file path
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7566c53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7566c53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7566c53
Branch: refs/heads/master
Commit: d7566c53d24b76bcdd2e3d61b436edea31bdb752
Parents: a98bbb2
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Thu Aug 11 11:55:17 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 13:56:58 2016 -0700
----------------------------------------------------------------------
.../src/test/java/org/apache/beam/examples/WordCountIT.java | 3 ++-
.../org/apache/beam/sdk/testing/FileChecksumMatcherTest.java | 7 +++----
2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7566c53/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index e438faf..f93dc2b 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -41,7 +41,8 @@ public class WordCountIT {
/**
* Options for the WordCount Integration Test.
*
- * Define expected output file checksum to verify WordCount pipeline result with customized input.
+ * <p>Define expected output file checksum to verify WordCount pipeline result
+ * with customized input.
*/
public interface WordCountITOptions extends TestPipelineOptions, WordCountOptions {
@Default.String("c04722202dee29c442b55ead54c6000693e85e77")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7566c53/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
index eebb73e..d94ffe2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
@@ -21,9 +21,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import org.apache.beam.sdk.PipelineResult;
-
+import org.apache.beam.sdk.util.IOChannelUtils;
import com.google.common.io.Files;
-
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -99,8 +98,8 @@ public class FileChecksumMatcherTest {
FileChecksumMatcher matcher =
new FileChecksumMatcher(
"90552392c28396935fe4f123bd0b5c2d0f6260c8",
- tmpFolder.getRoot().getPath() + "/*");
+ IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
assertThat(pResult, matcher);
}
- }
+}
[5/5] incubator-beam git commit: [BEAM-495] Create General Verifier
for File Checksum
Posted by lc...@apache.org.
[BEAM-495] Create General Verifier for File Checksum
This closes #772
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad449ffd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad449ffd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad449ffd
Branch: refs/heads/master
Commit: ad449ffd54483c2baf3a334980606b27d18fe386
Parents: a0769ad d7566c5
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 11 13:57:30 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 13:57:30 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCountIT.java | 118 ++--------------
.../beam/sdk/testing/FileChecksumMatcher.java | 136 +++++++++++++++++++
.../sdk/testing/FileChecksumMatcherTest.java | 105 ++++++++++++++
3 files changed, 251 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
[2/5] incubator-beam git commit: More unit test and code style fix
Posted by lc...@apache.org.
More unit test and code style fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a98bbb26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a98bbb26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a98bbb26
Branch: refs/heads/master
Commit: a98bbb26c12f96446b314f8229d9218236f0ce06
Parents: 0b5da70
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Thu Aug 11 11:26:28 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 13:56:58 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/testing/FileChecksumMatcher.java | 17 +++++-----
.../sdk/testing/FileChecksumMatcherTest.java | 34 ++++++++++++++++++--
2 files changed, 39 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a98bbb26/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
index f4bd062..303efcb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -84,13 +84,12 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
return actualChecksum.equals(expectedChecksum);
} catch (IOException e) {
throw new RuntimeException(
- String.format("Failed to read from path: %s", filePath));
+ String.format("Failed to read from path: %s", filePath));
}
}
private List<String> readLines(String path) throws IOException {
List<String> readData = new ArrayList<>();
-
IOChannelFactory factory = IOChannelUtils.getFactory(path);
// Match inputPath which may contains glob
@@ -100,7 +99,7 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
int i = 0;
for (String file : files) {
try (Reader reader =
- Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
+ Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
List<String> lines = CharStreams.readLines(reader);
readData.addAll(lines);
LOG.info(
@@ -122,16 +121,16 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
@Override
public void describeTo(Description description) {
description
- .appendText("Expected checksum is (")
- .appendText(expectedChecksum)
- .appendText(")");
+ .appendText("Expected checksum is (")
+ .appendText(expectedChecksum)
+ .appendText(")");
}
@Override
public void describeMismatchSafely(PipelineResult pResult, Description description) {
description
- .appendText("was (")
- .appendText(actualChecksum)
- .appendText(")");
+ .appendText("was (")
+ .appendText(actualChecksum)
+ .appendText(")");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a98bbb26/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
index 00417c7..eebb73e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
@@ -49,25 +49,39 @@ public class FileChecksumMatcherTest {
private PipelineResult pResult = Mockito.mock(PipelineResult.class);
@Test
- public void testPreconditionValidChecksumString() throws IOException{
+ public void testPreconditionChecksumIsNull() throws IOException {
String tmpPath = tmpFolder.newFile().getPath();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString("Expected valid checksum, but received"));
new FileChecksumMatcher(null, tmpPath);
+ }
+
+ @Test
+ public void testPreconditionChecksumIsEmpty() throws IOException {
+ String tmpPath = tmpFolder.newFile().getPath();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("Expected valid checksum, but received"));
new FileChecksumMatcher("", tmpPath);
}
@Test
- public void testPreconditionValidFilePath() throws IOException {
+ public void testPreconditionFilePathIsNull() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString("Expected valid file path, but received"));
new FileChecksumMatcher("checksumString", null);
+ }
+
+ @Test
+ public void testPreconditionFilePathIsEmpty() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("Expected valid file path, but received"));
new FileChecksumMatcher("checksumString", "");
}
@Test
- public void testChecksumVerify() throws IOException{
+ public void testMatcherVerifySingleFile() throws IOException{
File tmpFile = tmpFolder.newFile();
Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
FileChecksumMatcher matcher =
@@ -75,4 +89,18 @@ public class FileChecksumMatcherTest {
assertThat(pResult, matcher);
}
+
+ @Test
+ public void testMatcherVerifyMultipleFilesInOneDir() throws IOException {
+ File tmpFile1 = tmpFolder.newFile();
+ File tmpFile2 = tmpFolder.newFile();
+ Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
+ Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
+ FileChecksumMatcher matcher =
+ new FileChecksumMatcher(
+ "90552392c28396935fe4f123bd0b5c2d0f6260c8",
+ tmpFolder.getRoot().getPath() + "/*");
+
+ assertThat(pResult, matcher);
+ }
}
[4/5] incubator-beam git commit: Add output checksum to
WordCountITOptions
Posted by lc...@apache.org.
Add output checksum to WordCountITOptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b5da70d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b5da70d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b5da70d
Branch: refs/heads/master
Commit: 0b5da70d296543c00c8c4460107d1c2410c4e55f
Parents: c9a32e8
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Tue Aug 2 17:47:46 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 13:56:58 2016 -0700
----------------------------------------------------------------------
.../src/test/java/org/apache/beam/examples/WordCountIT.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0b5da70d/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index de6615c..e438faf 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -19,6 +19,7 @@
package org.apache.beam.examples;
import org.apache.beam.examples.WordCount.WordCountOptions;
+import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.FileChecksumMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -39,8 +40,13 @@ public class WordCountIT {
/**
* Options for the WordCount Integration Test.
+ *
+ * Define expected output file checksum to verify WordCount pipeline result with customized input.
*/
public interface WordCountITOptions extends TestPipelineOptions, WordCountOptions {
+ @Default.String("c04722202dee29c442b55ead54c6000693e85e77")
+ String getOutputChecksum();
+ void setOutputChecksum(String value);
}
@Test
@@ -54,8 +60,7 @@ public class WordCountIT {
"output",
"results"));
options.setOnSuccessMatcher(
- new FileChecksumMatcher("c04722202dee29c442b55ead54c6000693e85e77",
- options.getOutput() + "*"));
+ new FileChecksumMatcher(options.getOutputChecksum(), options.getOutput() + "*"));
WordCount.main(TestPipeline.convertToArgs(options));
}
[3/5] incubator-beam git commit: [BEAM-495] Create General Verifier
for File Checksum
Posted by lc...@apache.org.
[BEAM-495] Create General Verifier for File Checksum
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9a32e8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9a32e8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9a32e8b
Branch: refs/heads/master
Commit: c9a32e8b8b4ca182721bf81639bd2a28e53f9525
Parents: a0769ad
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Tue Aug 2 17:25:14 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 13:56:58 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCountIT.java | 112 +--------------
.../beam/sdk/testing/FileChecksumMatcher.java | 137 +++++++++++++++++++
.../sdk/testing/FileChecksumMatcherTest.java | 78 +++++++++++
3 files changed, 219 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a32e8b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index c087c67..de6615c 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -18,38 +18,18 @@
package org.apache.beam.examples;
-import static com.google.common.base.Preconditions.checkArgument;
-
import org.apache.beam.examples.WordCount.WordCountOptions;
-import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
-import com.google.common.base.Strings;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
-import com.google.common.io.CharStreams;
-
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.channels.Channels;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Date;
-import java.util.List;
/**
* End-to-end tests of WordCount.
@@ -73,94 +53,10 @@ public class WordCountIT {
String.format("WordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
"output",
"results"));
- options.setOnSuccessMatcher(new WordCountOnSuccessMatcher(options.getOutput() + "*"));
+ options.setOnSuccessMatcher(
+ new FileChecksumMatcher("c04722202dee29c442b55ead54c6000693e85e77",
+ options.getOutput() + "*"));
WordCount.main(TestPipeline.convertToArgs(options));
}
-
- /**
- * Matcher for verifying WordCount output data.
- */
- static class WordCountOnSuccessMatcher extends TypeSafeMatcher<PipelineResult>
- implements SerializableMatcher<PipelineResult> {
-
- private static final Logger LOG = LoggerFactory.getLogger(WordCountOnSuccessMatcher.class);
-
- private static final String EXPECTED_CHECKSUM = "c04722202dee29c442b55ead54c6000693e85e77";
- private String actualChecksum;
-
- private final String outputPath;
-
- WordCountOnSuccessMatcher(String outputPath) {
- checkArgument(
- !Strings.isNullOrEmpty(outputPath),
- "Expected valid output path, but received %s", outputPath);
-
- this.outputPath = outputPath;
- }
-
- @Override
- protected boolean matchesSafely(PipelineResult pResult) {
- try {
- // Load output data
- List<String> outputs = readLines(outputPath);
-
- // Verify outputs. Checksum is computed using SHA-1 algorithm
- actualChecksum = hashing(outputs);
- LOG.info("Generated checksum for output data: {}", actualChecksum);
-
- return actualChecksum.equals(EXPECTED_CHECKSUM);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to read from path: %s", outputPath));
- }
- }
-
- private List<String> readLines(String path) throws IOException {
- List<String> readData = new ArrayList<>();
-
- IOChannelFactory factory = IOChannelUtils.getFactory(path);
-
- // Match inputPath which may contains glob
- Collection<String> files = factory.match(path);
-
- // Read data from file paths
- int i = 0;
- for (String file : files) {
- try (Reader reader =
- Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
- List<String> lines = CharStreams.readLines(reader);
- readData.addAll(lines);
- LOG.info(
- "[{} of {}] Read {} lines from file: {}", i, files.size() - 1, lines.size(), file);
- }
- i++;
- }
- return readData;
- }
-
- private String hashing(List<String> strs) {
- List<HashCode> hashCodes = new ArrayList<>();
- for (String str : strs) {
- hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
- }
- return Hashing.combineUnordered(hashCodes).toString();
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("Expected checksum is (")
- .appendText(EXPECTED_CHECKSUM)
- .appendText(")");
- }
-
- @Override
- protected void describeMismatchSafely(PipelineResult pResult, Description description) {
- description
- .appendText("was (")
- .appendText(actualChecksum)
- .appendText(")");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a32e8b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
new file mode 100644
index 0000000..f4bd062
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+
+import com.google.common.base.Strings;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import com.google.common.io.CharStreams;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Matcher to verify file checksum in E2E test.
+ *
+ * <p>For example:
+ * <pre>{@code [
+ * assertTrue(job, new FileChecksumMatcher(checksumString, filePath));
+ * ]}</pre>
+ */
+public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
+ implements SerializableMatcher<PipelineResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
+
+ private final String expectedChecksum;
+ private final String filePath;
+ private String actualChecksum;
+
+ public FileChecksumMatcher(String checksum, String filePath) {
+ checkArgument(
+ !Strings.isNullOrEmpty(checksum),
+ "Expected valid checksum, but received %s", checksum);
+ checkArgument(
+ !Strings.isNullOrEmpty(filePath),
+ "Expected valid file path, but received %s", filePath);
+
+ this.expectedChecksum = checksum;
+ this.filePath = filePath;
+ }
+
+ @Override
+ public boolean matchesSafely(PipelineResult pipelineResult) {
+ try {
+ // Load output data
+ List<String> outputs = readLines(filePath);
+
+ // Verify outputs. Checksum is computed using SHA-1 algorithm
+ actualChecksum = hashing(outputs);
+ LOG.info("Generated checksum for output data: {}", actualChecksum);
+
+ return actualChecksum.equals(expectedChecksum);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to read from path: %s", filePath));
+ }
+ }
+
+ private List<String> readLines(String path) throws IOException {
+ List<String> readData = new ArrayList<>();
+
+ IOChannelFactory factory = IOChannelUtils.getFactory(path);
+
+ // Match inputPath which may contains glob
+ Collection<String> files = factory.match(path);
+
+ // Read data from file paths
+ int i = 0;
+ for (String file : files) {
+ try (Reader reader =
+ Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
+ List<String> lines = CharStreams.readLines(reader);
+ readData.addAll(lines);
+ LOG.info(
+ "[{} of {}] Read {} lines from file: {}", i, files.size() - 1, lines.size(), file);
+ }
+ i++;
+ }
+ return readData;
+ }
+
+ private String hashing(List<String> strs) {
+ List<HashCode> hashCodes = new ArrayList<>();
+ for (String str : strs) {
+ hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
+ }
+ return Hashing.combineUnordered(hashCodes).toString();
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("Expected checksum is (")
+ .appendText(expectedChecksum)
+ .appendText(")");
+ }
+
+ @Override
+ public void describeMismatchSafely(PipelineResult pResult, Description description) {
+ description
+ .appendText("was (")
+ .appendText(actualChecksum)
+ .appendText(")");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a32e8b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
new file mode 100644
index 0000000..00417c7
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+import org.apache.beam.sdk.PipelineResult;
+
+import com.google.common.io.Files;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/** Tests for {@link FileChecksumMatcher}. */
+@RunWith(JUnit4.class)
+public class FileChecksumMatcherTest {
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Mock
+ private PipelineResult pResult = Mockito.mock(PipelineResult.class);
+
+ @Test
+ public void testPreconditionValidChecksumString() throws IOException{
+ String tmpPath = tmpFolder.newFile().getPath();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("Expected valid checksum, but received"));
+ new FileChecksumMatcher(null, tmpPath);
+ new FileChecksumMatcher("", tmpPath);
+ }
+
+ @Test
+ public void testPreconditionValidFilePath() throws IOException {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("Expected valid file path, but received"));
+ new FileChecksumMatcher("checksumString", null);
+ new FileChecksumMatcher("checksumString", "");
+ }
+
+ @Test
+ public void testChecksumVerify() throws IOException{
+ File tmpFile = tmpFolder.newFile();
+ Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+ FileChecksumMatcher matcher =
+ new FileChecksumMatcher("a8772322f5d7b851777f820fc79d050f9d302915", tmpFile.getPath());
+
+ assertThat(pResult, matcher);
+ }
+ }