You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/30 17:53:19 UTC
[1/2] incubator-beam git commit: [BEAM-747] Fix FileChecksumMatcher
That Inconsistent With FS
Repository: incubator-beam
Updated Branches:
refs/heads/master 8042d52fc -> b75a76459
[BEAM-747] Fix FileChecksumMatcher That Inconsistent With FS
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d08a9f12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d08a9f12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d08a9f12
Branch: refs/heads/master
Commit: d08a9f1278e0bb4ec5b08e11f6267c516c8ea56e
Parents: 8042d52
Author: Mark Liu <ma...@markliu0.mtv.corp.google.com>
Authored: Tue Oct 25 14:57:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 09:52:34 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCountIT.java | 2 +-
.../beam/sdk/testing/FileChecksumMatcher.java | 168 ++++++++++++++++---
.../sdk/testing/FileChecksumMatcherTest.java | 131 ++++++++++++++-
3 files changed, 268 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d08a9f12/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index f2afe6a..01438de 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -65,7 +65,7 @@ public class WordCountIT {
"output",
"results"));
options.setOnSuccessMatcher(
- new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + "*"));
+ new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + "*-of-*"));
WordCount.main(TestPipeline.convertToArgs(options));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d08a9f12/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
index de6cea3..4b249fe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -19,8 +19,14 @@
package org.apache.beam.sdk.testing;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.common.io.CharStreams;
@@ -28,14 +34,21 @@ import java.io.IOException;
import java.io.Reader;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,71 +56,172 @@ import org.slf4j.LoggerFactory;
* Matcher to verify file checksum in E2E test.
*
* <p>For example:
- * <pre>{@code [
- * assertTrue(job, new FileChecksumMatcher(checksumString, filePath));
- * ]}</pre>
+ * <pre>{@code
+ * assertThat(job, new FileChecksumMatcher(checksumString, filePath));
+ * }</pre>
+ * or
+ * <pre>{@code
+ * assertThat(job, new FileChecksumMatcher(checksumString, filePath, shardTemplate));
+ * }</pre>
+ *
+ * <p>Checksum of outputs is generated based on SHA-1 algorithm. If output file is empty,
+ * SHA-1 hash of empty string (da39a3ee5e6b4b0d3255bfef95601890afd80709) is used as expected.
*/
public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
implements SerializableMatcher<PipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
+ static final int MAX_READ_RETRIES = 4;
+ static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
+ static final FluentBackoff BACK_OFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+ .withMaxRetries(MAX_READ_RETRIES);
+
+ private static final Pattern DEFAULT_SHARD_TEMPLATE =
+ Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)");
+
private final String expectedChecksum;
private final String filePath;
+ private final Pattern shardTemplate;
private String actualChecksum;
+ /**
+ * Constructor that uses default shard template.
+ *
+ * @param checksum expected checksum string used to verify file content.
+ * @param filePath path of files that's to be verified.
+ */
public FileChecksumMatcher(String checksum, String filePath) {
+ this(checksum, filePath, DEFAULT_SHARD_TEMPLATE);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param checksum expected checksum string used to verify file content.
+ * @param filePath path of files that's to be verified.
+ * @param shardTemplate template of shard name to parse out the total number of shards
+ * which is used in I/O retry to avoid inconsistency of filesystem.
+ * Customized template should assign name "numshards" to capturing
+ * group - total shard number.
+ */
+ public FileChecksumMatcher(String checksum, String filePath, Pattern shardTemplate) {
checkArgument(
!Strings.isNullOrEmpty(checksum),
"Expected valid checksum, but received %s", checksum);
checkArgument(
!Strings.isNullOrEmpty(filePath),
"Expected valid file path, but received %s", filePath);
+ checkNotNull(
+ shardTemplate,
+ "Expected non-null shard pattern. "
+ + "Please call the other constructor to use default pattern: %s",
+ DEFAULT_SHARD_TEMPLATE);
this.expectedChecksum = checksum;
this.filePath = filePath;
+ this.shardTemplate = shardTemplate;
}
@Override
public boolean matchesSafely(PipelineResult pipelineResult) {
+ // Load output data
+ List<String> outputs;
try {
- // Load output data
- List<String> outputs = readLines(filePath);
+ outputs = readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Failed to read from: %s", filePath), e);
+ }
- // Verify outputs. Checksum is computed using SHA-1 algorithm
- actualChecksum = hashing(outputs);
- LOG.info("Generated checksum for output data: {}", actualChecksum);
+ // Verify outputs. Checksum is computed using SHA-1 algorithm
+ actualChecksum = computeHash(outputs);
+ LOG.debug("Generated checksum: {}", actualChecksum);
- return actualChecksum.equals(expectedChecksum);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to read from path: %s", filePath));
- }
+ return actualChecksum.equals(expectedChecksum);
}
- private List<String> readLines(String path) throws IOException {
- List<String> readData = new ArrayList<>();
- IOChannelFactory factory = IOChannelUtils.getFactory(path);
-
- // Match inputPath which may contains glob
- Collection<String> files = factory.match(path);
+ @VisibleForTesting
+ List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+ throws IOException, InterruptedException {
+ IOChannelFactory factory = IOChannelUtils.getFactory(filePath);
+ IOException lastException = null;
+
+ do {
+ try {
+ // Match inputPath which may contains glob
+ Collection<String> files = factory.match(filePath);
+ LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath);
+
+ if (files.isEmpty() || !checkTotalNumOfFiles(files)) {
+ continue;
+ }
+
+ // Read data from file paths
+ return readLines(files, factory);
+ } catch (IOException e) {
+ // Ignore and retry
+ lastException = e;
+ LOG.warn("Error in file reading. Ignore and retry.");
+ }
+ } while(BackOffUtils.next(sleeper, backOff));
+ // Failed after max retries
+ throw new IOException(
+ String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES),
+ lastException);
+ }
- // Read data from file paths
- int i = 0;
+ @VisibleForTesting
+ List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException {
+ List<String> allLines = Lists.newArrayList();
+ int i = 1;
for (String file : files) {
try (Reader reader =
- Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
+ Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
List<String> lines = CharStreams.readLines(reader);
- readData.addAll(lines);
- LOG.info(
- "[{} of {}] Read {} lines from file: {}", i, files.size() - 1, lines.size(), file);
+ allLines.addAll(lines);
+ LOG.debug(
+ "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
}
i++;
}
- return readData;
+ return allLines;
+ }
+
+ /**
+ * Check if total number of files is correct by comparing with the number that
+ * is parsed from shard name using a name template. If no template is specified,
+ * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total
+ * number of files.
+ *
+ * @return {@code true} if at least one shard name matches template and total number
+ * of given files equals the number that is parsed from shard name.
+ */
+ @VisibleForTesting
+ boolean checkTotalNumOfFiles(Collection<String> files) {
+ for (String filePath : files) {
+ Path fileName = Paths.get(filePath).getFileName();
+ if (fileName == null) {
+ // this path has zero elements
+ continue;
+ }
+ Matcher matcher = shardTemplate.matcher(fileName.toString());
+ if (!matcher.matches()) {
+ // shard name doesn't match the pattern, check with the next shard
+ continue;
+ }
+ // once match, extract total number of shards and compare to file list
+ return files.size() == Integer.parseInt(matcher.group("numshards"));
+ }
+ return false;
}
- private String hashing(List<String> strs) {
+ private String computeHash(@Nonnull List<String> strs) {
+ if (strs.isEmpty()) {
+ return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
+ }
+
List<HashCode> hashCodes = new ArrayList<>();
for (String str : strs) {
hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d08a9f12/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
index b2f2ec8..0dc307d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
@@ -19,12 +19,20 @@ package org.apache.beam.sdk.testing;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import com.google.api.client.util.BackOff;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.regex.Pattern;
+
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.junit.Rule;
import org.junit.Test;
@@ -42,10 +50,14 @@ public class FileChecksumMatcherTest {
public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule
public ExpectedException thrown = ExpectedException.none();
+ @Rule
+ public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
@Mock
private PipelineResult pResult = Mockito.mock(PipelineResult.class);
+ private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff();
+
@Test
public void testPreconditionChecksumIsNull() throws IOException {
String tmpPath = tmpFolder.newFile().getPath();
@@ -79,8 +91,20 @@ public class FileChecksumMatcherTest {
}
@Test
- public void testMatcherVerifySingleFile() throws IOException{
- File tmpFile = tmpFolder.newFile();
+ public void testPreconditionShardTemplateIsNull() throws IOException {
+ String tmpPath = tmpFolder.newFile().getPath();
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage(
+ containsString(
+ "Expected non-null shard pattern. "
+ + "Please call the other constructor to use default pattern:"));
+ new FileChecksumMatcher("checksumString", tmpPath, null);
+ }
+
+ @Test
+ public void testMatcherThatVerifiesSingleFile() throws IOException{
+ File tmpFile = tmpFolder.newFile("result-000-of-001");
Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
FileChecksumMatcher matcher =
new FileChecksumMatcher("a8772322f5d7b851777f820fc79d050f9d302915", tmpFile.getPath());
@@ -89,16 +113,113 @@ public class FileChecksumMatcherTest {
}
@Test
- public void testMatcherVerifyMultipleFilesInOneDir() throws IOException {
- File tmpFile1 = tmpFolder.newFile();
- File tmpFile2 = tmpFolder.newFile();
+ public void testMatcherThatVerifiesMultipleFiles() throws IOException {
+ File tmpFile1 = tmpFolder.newFile("result-000-of-002");
+ File tmpFile2 = tmpFolder.newFile("result-001-of-002");
+ File tmpFile3 = tmpFolder.newFile("tmp");
Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
+ Files.write("tmp", tmpFile3, StandardCharsets.UTF_8);
+
FileChecksumMatcher matcher =
new FileChecksumMatcher(
"90552392c28396935fe4f123bd0b5c2d0f6260c8",
+ IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "result-*"));
+
+ assertThat(pResult, matcher);
+ }
+
+ @Test
+ public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException {
+ File emptyFile = tmpFolder.newFile("result-000-of-001");
+ Files.write("", emptyFile, StandardCharsets.UTF_8);
+ FileChecksumMatcher matcher =
+ new FileChecksumMatcher(
+ "da39a3ee5e6b4b0d3255bfef95601890afd80709",
IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
assertThat(pResult, matcher);
}
+
+ @Test
+ public void testMatcherThatUsesCustomizedTemplate() throws Exception {
+ // Customized template: resultSSS-totalNNN
+ File tmpFile1 = tmpFolder.newFile("result0-total2");
+ File tmpFile2 = tmpFolder.newFile("result1-total2");
+ Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
+ Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
+
+ Pattern customizedTemplate =
+ Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)");
+ FileChecksumMatcher matcher = new FileChecksumMatcher(
+ "90552392c28396935fe4f123bd0b5c2d0f6260c8",
+ IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"),
+ customizedTemplate);
+
+ assertThat(pResult, matcher);
+ }
+
+ @Test
+ public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception {
+ File tmpFile = tmpFolder.newFile();
+ Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+
+ FileChecksumMatcher matcher = new FileChecksumMatcher(
+ "mock-checksum",
+ IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"),
+ Pattern.compile("incorrect-template"));
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(
+ containsString(
+ "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
+ matcher.readFilesWithRetries(fastClock, backOff);
+ }
+
+ @Test
+ public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
+ File tmpFile = tmpFolder.newFile();
+ Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+
+ FileChecksumMatcher matcher =
+ spy(new FileChecksumMatcher(
+ "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")));
+ doThrow(IOException.class)
+ .when(matcher).readLines(anyCollection(), any(IOChannelFactory.class));
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(
+ containsString(
+ "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
+ matcher.readFilesWithRetries(fastClock, backOff);
+ }
+
+ @Test
+ public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
+ FileChecksumMatcher matcher =
+ new FileChecksumMatcher(
+ "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(
+ containsString(
+ "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
+ matcher.readFilesWithRetries(fastClock, backOff);
+ }
+
+ @Test
+ public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception {
+ tmpFolder.newFile("result-000-of-001");
+ tmpFolder.newFile("tmp-result-000-of-001");
+
+ FileChecksumMatcher matcher =
+ new FileChecksumMatcher(
+ "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(
+ containsString(
+ "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
+ matcher.readFilesWithRetries(fastClock, backOff);
+ }
}
[2/2] incubator-beam git commit: This closes #1189
Posted by ke...@apache.org.
This closes #1189
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b75a7645
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b75a7645
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b75a7645
Branch: refs/heads/master
Commit: b75a76459e18c032af7981197acd0c1fb826bc97
Parents: 8042d52 d08a9f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 30 09:53:02 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 09:53:02 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCountIT.java | 2 +-
.../beam/sdk/testing/FileChecksumMatcher.java | 168 ++++++++++++++++---
.../sdk/testing/FileChecksumMatcherTest.java | 131 ++++++++++++++-
3 files changed, 268 insertions(+), 33 deletions(-)
----------------------------------------------------------------------