You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/01/23 20:17:02 UTC

[beam] branch master updated: [BEAM-6491] Remove race in FileIOTest.testMatchWatchForNewFiles

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cb0fdbc  [BEAM-6491] Remove race in FileIOTest.testMatchWatchForNewFiles
     new 315b1bc  Merge pull request #7599: [BEAM-6491] Remove race in FileIOTest.testMatchWatchForNewFiles
cb0fdbc is described below

commit cb0fdbc327145bf479015ae7f651eb99b5f8d2be
Author: Jeff Klukas <je...@klukas.net>
AuthorDate: Wed Jan 23 08:36:50 2019 -0500

    [BEAM-6491] Remove race in FileIOTest.testMatchWatchForNewFiles
    
    This test was checking lastModificationTime for files that might not yet exist.
    We now create the files beforehand so lastModificationTime is known, and have
    the writer thread copy them into place, preserving the metadata.
---
 .../java/org/apache/beam/sdk/io/FileIOTest.java    | 47 +++++++++++++---------
 1 file changed, 29 insertions(+), 18 deletions(-)

diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
index 27ed431..70d77f7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
@@ -30,8 +30,10 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.io.Writer;
+import java.nio.file.CopyOption;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.FileTime;
 import java.util.Arrays;
 import java.util.List;
@@ -55,7 +57,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
 import org.joda.time.Duration;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -200,19 +201,26 @@ public class FileIOTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6491")
   public void testMatchWatchForNewFiles() throws IOException, InterruptedException {
-    final Path basePath = tmpFolder.getRoot().toPath().resolve("watch");
-    basePath.toFile().mkdir();
+    // Write some files to a "source" directory.
+    final Path sourcePath = tmpFolder.getRoot().toPath().resolve("source");
+    sourcePath.toFile().mkdir();
+    Files.write(sourcePath.resolve("first"), new byte[42]);
+    Files.write(sourcePath.resolve("second"), new byte[37]);
+    Files.write(sourcePath.resolve("third"), new byte[99]);
+
+    // Create a "watch" directory that the pipeline will copy files into.
+    final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch");
+    watchPath.toFile().mkdir();
     PCollection<MatchResult.Metadata> matchMetadata =
         p.apply(
             FileIO.match()
-                .filepattern(basePath.resolve("*").toString())
+                .filepattern(watchPath.resolve("*").toString())
                 .continuously(
                     Duration.millis(100),
                     Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3))));
     PCollection<MatchResult.Metadata> matchAllMetadata =
-        p.apply(Create.of(basePath.resolve("*").toString()))
+        p.apply(Create.of(watchPath.resolve("*").toString()))
             .apply(
                 FileIO.matchAll()
                     .continuously(
@@ -221,36 +229,35 @@ public class FileIOTest implements Serializable {
     assertEquals(PCollection.IsBounded.UNBOUNDED, matchMetadata.isBounded());
     assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded());
 
+    // Copy the files to the "watch" directory, preserving the lastModifiedTime;
+    // the COPY_ATTRIBUTES option ensures that we will at a minimum copy lastModifiedTime.
+    CopyOption[] copyOptions = {StandardCopyOption.COPY_ATTRIBUTES};
     Thread writer =
         new Thread(
             () -> {
               try {
                 Thread.sleep(1000);
-                Files.write(basePath.resolve("first"), new byte[42]);
+                Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), copyOptions);
                 Thread.sleep(300);
-                Files.write(basePath.resolve("second"), new byte[37]);
+                Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), copyOptions);
                 Thread.sleep(300);
-                Files.write(basePath.resolve("third"), new byte[99]);
+                Files.copy(sourcePath.resolve("third"), watchPath.resolve("third"), copyOptions);
               } catch (IOException | InterruptedException e) {
                 throw new RuntimeException(e);
               }
             });
     writer.start();
 
+    // We fetch lastModifiedTime from the files in the "source" directory to avoid a race condition
+    // with the writer thread.
     List<MatchResult.Metadata> expected =
         Arrays.asList(
             metadata(
-                basePath.resolve("first"),
-                42,
-                Files.getLastModifiedTime(basePath.resolve("first")).toMillis()),
+                watchPath.resolve("first"), 42, lastModifiedMillis(sourcePath.resolve("first"))),
             metadata(
-                basePath.resolve("second"),
-                37,
-                Files.getLastModifiedTime(basePath.resolve("second")).toMillis()),
+                watchPath.resolve("second"), 37, lastModifiedMillis(sourcePath.resolve("second"))),
             metadata(
-                basePath.resolve("third"),
-                99,
-                Files.getLastModifiedTime(basePath.resolve("third")).toMillis()));
+                watchPath.resolve("third"), 99, lastModifiedMillis(sourcePath.resolve("third"))));
     PAssert.that(matchMetadata).containsInAnyOrder(expected);
     PAssert.that(matchAllMetadata).containsInAnyOrder(expected);
     p.run();
@@ -334,6 +341,10 @@ public class FileIOTest implements Serializable {
         .build();
   }
 
+  private static long lastModifiedMillis(Path path) throws IOException {
+    return Files.getLastModifiedTime(path).toMillis();
+  }
+
   private static FileIO.Write.FileNaming resolveFileNaming(FileIO.Write<?, ?> write)
       throws Exception {
     return write.resolveFileNamingFn().getClosure().apply(null, null);