You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/30 17:53:19 UTC

[1/2] incubator-beam git commit: [BEAM-747] Fix FileChecksumMatcher That Inconsistent With FS

Repository: incubator-beam
Updated Branches:
  refs/heads/master 8042d52fc -> b75a76459


[BEAM-747] Fix FileChecksumMatcher That Inconsistent With FS


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d08a9f12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d08a9f12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d08a9f12

Branch: refs/heads/master
Commit: d08a9f1278e0bb4ec5b08e11f6267c516c8ea56e
Parents: 8042d52
Author: Mark Liu <ma...@markliu0.mtv.corp.google.com>
Authored: Tue Oct 25 14:57:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 09:52:34 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCountIT.java   |   2 +-
 .../beam/sdk/testing/FileChecksumMatcher.java   | 168 ++++++++++++++++---
 .../sdk/testing/FileChecksumMatcherTest.java    | 131 ++++++++++++++-
 3 files changed, 268 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d08a9f12/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index f2afe6a..01438de 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -65,7 +65,7 @@ public class WordCountIT {
         "output",
         "results"));
     options.setOnSuccessMatcher(
-        new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + "*"));
+        new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + "*-of-*"));
 
     WordCount.main(TestPipeline.convertToArgs(options));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d08a9f12/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
index de6cea3..4b249fe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -19,8 +19,14 @@
 package org.apache.beam.sdk.testing;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
 import com.google.common.io.CharStreams;
@@ -28,14 +34,21 @@ import java.io.IOException;
 import java.io.Reader;
 import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,71 +56,172 @@ import org.slf4j.LoggerFactory;
  * Matcher to verify file checksum in E2E test.
  *
  * <p>For example:
- * <pre>{@code [
- *   assertTrue(job, new FileChecksumMatcher(checksumString, filePath));
- * ]}</pre>
+ * <pre>{@code
+ *   assertThat(job, new FileChecksumMatcher(checksumString, filePath));
+ * }</pre>
+ * or
+ * <pre>{@code
+ *   assertThat(job, new FileChecksumMatcher(checksumString, filePath, shardTemplate));
+ * }</pre>
+ *
+ * <p>Checksum of outputs is generated based on SHA-1 algorithm. If output file is empty,
+ * SHA-1 hash of empty string (da39a3ee5e6b4b0d3255bfef95601890afd80709) is used as expected.
  */
 public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
     implements SerializableMatcher<PipelineResult> {
 
   private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
 
+  static final int MAX_READ_RETRIES = 4;
+  static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
+  static final FluentBackoff BACK_OFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+          .withMaxRetries(MAX_READ_RETRIES);
+
+  private static final Pattern DEFAULT_SHARD_TEMPLATE =
+      Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)");
+
   private final String expectedChecksum;
   private final String filePath;
+  private final Pattern shardTemplate;
   private String actualChecksum;
 
+  /**
+   * Constructor that uses default shard template.
+   *
+   * @param checksum expected checksum string used to verify file content.
+   * @param filePath path of files that's to be verified.
+   */
   public FileChecksumMatcher(String checksum, String filePath) {
+    this(checksum, filePath, DEFAULT_SHARD_TEMPLATE);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param checksum expected checksum string used to verify file content.
+   * @param filePath path of files that's to be verified.
+   * @param shardTemplate template of shard name to parse out the total number of shards
+   *                      which is used in I/O retry to avoid inconsistency of filesystem.
+   *                      Customized template should assign name "numshards" to capturing
+   *                      group - total shard number.
+   */
+  public FileChecksumMatcher(String checksum, String filePath, Pattern shardTemplate) {
     checkArgument(
         !Strings.isNullOrEmpty(checksum),
         "Expected valid checksum, but received %s", checksum);
     checkArgument(
         !Strings.isNullOrEmpty(filePath),
         "Expected valid file path, but received %s", filePath);
+    checkNotNull(
+        shardTemplate,
+        "Expected non-null shard pattern. "
+            + "Please call the other constructor to use default pattern: %s",
+        DEFAULT_SHARD_TEMPLATE);
 
     this.expectedChecksum = checksum;
     this.filePath = filePath;
+    this.shardTemplate = shardTemplate;
   }
 
   @Override
   public boolean matchesSafely(PipelineResult pipelineResult) {
+    // Load output data
+    List<String> outputs;
     try {
-      // Load output data
-      List<String> outputs = readLines(filePath);
+      outputs = readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+    } catch (Exception e) {
+      throw new RuntimeException(String.format("Failed to read from: %s", filePath), e);
+    }
 
-      // Verify outputs. Checksum is computed using SHA-1 algorithm
-      actualChecksum = hashing(outputs);
-      LOG.info("Generated checksum for output data: {}", actualChecksum);
+    // Verify outputs. Checksum is computed using SHA-1 algorithm
+    actualChecksum = computeHash(outputs);
+    LOG.debug("Generated checksum: {}", actualChecksum);
 
-      return actualChecksum.equals(expectedChecksum);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Failed to read from path: %s", filePath));
-    }
+    return actualChecksum.equals(expectedChecksum);
   }
 
-  private List<String> readLines(String path) throws IOException {
-    List<String> readData = new ArrayList<>();
-    IOChannelFactory factory = IOChannelUtils.getFactory(path);
-
-    // Match inputPath which may contains glob
-    Collection<String> files = factory.match(path);
+  @VisibleForTesting
+  List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+      throws IOException, InterruptedException {
+    IOChannelFactory factory = IOChannelUtils.getFactory(filePath);
+    IOException lastException = null;
+
+    do {
+      try {
+        // Match inputPath which may contains glob
+        Collection<String> files = factory.match(filePath);
+        LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath);
+
+        if (files.isEmpty() || !checkTotalNumOfFiles(files)) {
+          continue;
+        }
+
+        // Read data from file paths
+        return readLines(files, factory);
+      } catch (IOException e) {
+        // Ignore and retry
+        lastException = e;
+        LOG.warn("Error in file reading. Ignore and retry.");
+      }
+    } while(BackOffUtils.next(sleeper, backOff));
+    // Failed after max retries
+    throw new IOException(
+        String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES),
+        lastException);
+  }
 
-    // Read data from file paths
-    int i = 0;
+  @VisibleForTesting
+  List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException {
+    List<String> allLines = Lists.newArrayList();
+    int i = 1;
     for (String file : files) {
       try (Reader reader =
-          Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
+               Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
         List<String> lines = CharStreams.readLines(reader);
-        readData.addAll(lines);
-        LOG.info(
-            "[{} of {}] Read {} lines from file: {}", i, files.size() - 1, lines.size(), file);
+        allLines.addAll(lines);
+        LOG.debug(
+            "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
       }
       i++;
     }
-    return readData;
+    return allLines;
+  }
+
+  /**
+   * Check if total number of files is correct by comparing with the number that
+   * is parsed from shard name using a name template. If no template is specified,
+   * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total
+   * number of files.
+   *
+   * @return {@code true} if at least one shard name matches template and total number
+   * of given files equals the number that is parsed from shard name.
+   */
+  @VisibleForTesting
+  boolean checkTotalNumOfFiles(Collection<String> files) {
+    for (String filePath : files) {
+      Path fileName = Paths.get(filePath).getFileName();
+      if (fileName == null) {
+        // this path has zero elements
+        continue;
+      }
+      Matcher matcher = shardTemplate.matcher(fileName.toString());
+      if (!matcher.matches()) {
+        // shard name doesn't match the pattern, check with the next shard
+        continue;
+      }
+      // once match, extract total number of shards and compare to file list
+      return files.size() == Integer.parseInt(matcher.group("numshards"));
+    }
+    return false;
   }
 
-  private String hashing(List<String> strs) {
+  private String computeHash(@Nonnull List<String> strs) {
+    if (strs.isEmpty()) {
+      return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
+    }
+
     List<HashCode> hashCodes = new ArrayList<>();
     for (String str : strs) {
       hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d08a9f12/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
index b2f2ec8..0dc307d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
@@ -19,12 +19,20 @@ package org.apache.beam.sdk.testing;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
 
+import com.google.api.client.util.BackOff;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.regex.Pattern;
+
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -42,10 +50,14 @@ public class FileChecksumMatcherTest {
   public TemporaryFolder tmpFolder = new TemporaryFolder();
   @Rule
   public ExpectedException thrown = ExpectedException.none();
+  @Rule
+  public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
 
   @Mock
   private PipelineResult pResult = Mockito.mock(PipelineResult.class);
 
+  private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff();
+
   @Test
   public void testPreconditionChecksumIsNull() throws IOException {
     String tmpPath = tmpFolder.newFile().getPath();
@@ -79,8 +91,20 @@ public class FileChecksumMatcherTest {
   }
 
   @Test
-  public void testMatcherVerifySingleFile() throws IOException{
-    File tmpFile = tmpFolder.newFile();
+  public void testPreconditionShardTemplateIsNull() throws IOException {
+    String tmpPath = tmpFolder.newFile().getPath();
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage(
+        containsString(
+            "Expected non-null shard pattern. "
+                + "Please call the other constructor to use default pattern:"));
+    new FileChecksumMatcher("checksumString", tmpPath, null);
+  }
+
+  @Test
+  public void testMatcherThatVerifiesSingleFile() throws IOException{
+    File tmpFile = tmpFolder.newFile("result-000-of-001");
     Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
     FileChecksumMatcher matcher =
         new FileChecksumMatcher("a8772322f5d7b851777f820fc79d050f9d302915", tmpFile.getPath());
@@ -89,16 +113,113 @@ public class FileChecksumMatcherTest {
   }
 
   @Test
-  public void testMatcherVerifyMultipleFilesInOneDir() throws IOException {
-    File tmpFile1 = tmpFolder.newFile();
-    File tmpFile2 = tmpFolder.newFile();
+  public void testMatcherThatVerifiesMultipleFiles() throws IOException {
+    File tmpFile1 = tmpFolder.newFile("result-000-of-002");
+    File tmpFile2 = tmpFolder.newFile("result-001-of-002");
+    File tmpFile3 = tmpFolder.newFile("tmp");
     Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
     Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
+    Files.write("tmp", tmpFile3, StandardCharsets.UTF_8);
+
     FileChecksumMatcher matcher =
         new FileChecksumMatcher(
             "90552392c28396935fe4f123bd0b5c2d0f6260c8",
+            IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "result-*"));
+
+    assertThat(pResult, matcher);
+  }
+
+  @Test
+  public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException {
+    File emptyFile = tmpFolder.newFile("result-000-of-001");
+    Files.write("", emptyFile, StandardCharsets.UTF_8);
+    FileChecksumMatcher matcher =
+        new FileChecksumMatcher(
+            "da39a3ee5e6b4b0d3255bfef95601890afd80709",
             IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
 
     assertThat(pResult, matcher);
   }
+
+  @Test
+  public void testMatcherThatUsesCustomizedTemplate() throws Exception {
+    // Customized template: resultSSS-totalNNN
+    File tmpFile1 = tmpFolder.newFile("result0-total2");
+    File tmpFile2 = tmpFolder.newFile("result1-total2");
+    Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
+    Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
+
+    Pattern customizedTemplate =
+        Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)");
+    FileChecksumMatcher matcher = new FileChecksumMatcher(
+        "90552392c28396935fe4f123bd0b5c2d0f6260c8",
+        IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"),
+        customizedTemplate);
+
+    assertThat(pResult, matcher);
+  }
+
+  @Test
+  public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception {
+    File tmpFile = tmpFolder.newFile();
+    Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+
+    FileChecksumMatcher matcher = new FileChecksumMatcher(
+        "mock-checksum",
+        IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"),
+        Pattern.compile("incorrect-template"));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(
+        containsString(
+            "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
+    matcher.readFilesWithRetries(fastClock, backOff);
+  }
+
+  @Test
+  public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
+    File tmpFile = tmpFolder.newFile();
+    Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+
+    FileChecksumMatcher matcher =
+        spy(new FileChecksumMatcher(
+            "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")));
+    doThrow(IOException.class)
+        .when(matcher).readLines(anyCollection(), any(IOChannelFactory.class));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(
+        containsString(
+            "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
+    matcher.readFilesWithRetries(fastClock, backOff);
+  }
+
+  @Test
+  public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
+    FileChecksumMatcher matcher =
+        new FileChecksumMatcher(
+            "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(
+        containsString(
+            "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
+    matcher.readFilesWithRetries(fastClock, backOff);
+  }
+
+  @Test
+  public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception {
+    tmpFolder.newFile("result-000-of-001");
+    tmpFolder.newFile("tmp-result-000-of-001");
+
+    FileChecksumMatcher matcher =
+        new FileChecksumMatcher(
+            "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(
+        containsString(
+            "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
+    matcher.readFilesWithRetries(fastClock, backOff);
+  }
 }


[2/2] incubator-beam git commit: This closes #1189

Posted by ke...@apache.org.
This closes #1189


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b75a7645
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b75a7645
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b75a7645

Branch: refs/heads/master
Commit: b75a76459e18c032af7981197acd0c1fb826bc97
Parents: 8042d52 d08a9f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 30 09:53:02 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 09:53:02 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCountIT.java   |   2 +-
 .../beam/sdk/testing/FileChecksumMatcher.java   | 168 ++++++++++++++++---
 .../sdk/testing/FileChecksumMatcherTest.java    | 131 ++++++++++++++-
 3 files changed, 268 insertions(+), 33 deletions(-)
----------------------------------------------------------------------