You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/15 00:25:56 UTC

[GitHub] [beam] Abacn opened a new pull request, #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Abacn opened a new pull request, #21877:
URL: https://github.com/apache/beam/pull/21877

   Follow up fixes flaky test introduced in #21570
   
   Test been flaky because the pipeline starts indefinite time after p.run() is called. Wrapping the thread that write files resolves this timing issue (same test structure for integration test)
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1156725638

   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1164545072

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1164698323

   R: @kennknowles 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1163354318

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1156722459

   Previous flake seen in:
   https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/5556/
   https://ci-beam.apache.org/view/PostCommit/job/beam_PostCommit_Java_DataflowV1/1738/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles merged pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
kennknowles merged PR #21877:
URL: https://github.com/apache/beam/pull/21877


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1156903493

   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1164543021

   @johnjcasey PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1156715224

   Tests being flake mainly because there is no control of when the is pipeline started running. The writing file thread may start early and the watch transform misses to capture some changes. Test failure does not indicate the functionality issue of the methods being tested. 
   
   To fix the flaky tests, logic of both unit test and integration test has been changed. For the unit test, file rewrite is triggered downstream of the pipeline. This eliminated a separate thread and guarantees the second write happens after the watch transform is running.
   
   For the integration test, it is observed that the downstream stage may be started after several seconds after the previous stage in dataflow runner. Putting rewrite in downstream then may have problems that it never gets triggered within the wait time of watch transform, causing another flake. 
   Another approach is used for the integration test. A separate writing thread is set up before the pipeline running, and turned down after pipeline ending. And assertion logic is changed to be insensitive to when pipeline starts and stops.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21877:
URL: https://github.com/apache/beam/pull/21877#discussion_r898342199


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -25,128 +25,171 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Date;
-import java.util.List;
-import org.apache.beam.runners.direct.DirectRunner;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class GcsMatchIT {
-  /** DoFn that writes test files to Gcs System when first time called. */
-  private static class WriteToGcsFn extends DoFn<GcsPath, Void> {
-    public WriteToGcsFn(long waitSec) {
-      this.waitSec = waitSec;
+  /** A thread that write to Gcs continuously. */
+  private static class WriteToPathContinuously extends Thread {
+    public WriteToPathContinuously(Path writePath, long interval) {
+      this.writePath = writePath;
+      this.interval = interval;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      GcsPath writePath = context.element();
-      assert writePath != null;
-      Thread writer =
-          new Thread(
-              () -> {
-                try {
-                  // Write test files to writePath
-                  Thread.sleep(waitSec * 1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 99);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 37);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  writeBytesToFile(writePath.resolve("third").toString(), 99);
-                } catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              });
-      writer.start();
+    @Override
+    public void run() {
+      int fileSize = 1;
+      // write a file at the beginning
+      writeBytesToFile(writePath.resolve("first").toString(), fileSize);
+
+      while (true) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          return;

Review Comment:
   this should probably throw a runtime exception, just in case



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -25,128 +25,171 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Date;
-import java.util.List;
-import org.apache.beam.runners.direct.DirectRunner;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class GcsMatchIT {
-  /** DoFn that writes test files to Gcs System when first time called. */
-  private static class WriteToGcsFn extends DoFn<GcsPath, Void> {
-    public WriteToGcsFn(long waitSec) {
-      this.waitSec = waitSec;
+  /** A thread that write to Gcs continuously. */
+  private static class WriteToPathContinuously extends Thread {
+    public WriteToPathContinuously(Path writePath, long interval) {
+      this.writePath = writePath;
+      this.interval = interval;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      GcsPath writePath = context.element();
-      assert writePath != null;
-      Thread writer =
-          new Thread(
-              () -> {
-                try {
-                  // Write test files to writePath
-                  Thread.sleep(waitSec * 1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 99);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 37);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  writeBytesToFile(writePath.resolve("third").toString(), 99);
-                } catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              });
-      writer.start();
+    @Override
+    public void run() {
+      int fileSize = 1;
+      // write a file at the beginning
+      writeBytesToFile(writePath.resolve("first").toString(), fileSize);
+
+      while (true) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          return;
+        }
+        // write another file continuously
+        writeBytesToFile(writePath.resolve("second").toString(), fileSize);
+        fileSize += 1;

Review Comment:
   we should cap this at some number so the test isn't indefinite



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #21877:
URL: https://github.com/apache/beam/pull/21877#discussion_r898376736


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -25,128 +25,171 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Date;
-import java.util.List;
-import org.apache.beam.runners.direct.DirectRunner;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class GcsMatchIT {
-  /** DoFn that writes test files to Gcs System when first time called. */
-  private static class WriteToGcsFn extends DoFn<GcsPath, Void> {
-    public WriteToGcsFn(long waitSec) {
-      this.waitSec = waitSec;
+  /** A thread that write to Gcs continuously. */
+  private static class WriteToPathContinuously extends Thread {
+    public WriteToPathContinuously(Path writePath, long interval) {
+      this.writePath = writePath;
+      this.interval = interval;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      GcsPath writePath = context.element();
-      assert writePath != null;
-      Thread writer =
-          new Thread(
-              () -> {
-                try {
-                  // Write test files to writePath
-                  Thread.sleep(waitSec * 1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 99);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 37);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  writeBytesToFile(writePath.resolve("third").toString(), 99);
-                } catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              });
-      writer.start();
+    @Override
+    public void run() {
+      int fileSize = 1;
+      // write a file at the beginning
+      writeBytesToFile(writePath.resolve("first").toString(), fileSize);
+
+      while (true) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          return;
+        }
+        // write another file continuously
+        writeBytesToFile(writePath.resolve("second").toString(), fileSize);
+        fileSize += 1;

Review Comment:
   added `!Thread.interrupted()` which should be used here, and also set a max file size.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -25,128 +25,171 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Date;
-import java.util.List;
-import org.apache.beam.runners.direct.DirectRunner;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class GcsMatchIT {
-  /** DoFn that writes test files to Gcs System when first time called. */
-  private static class WriteToGcsFn extends DoFn<GcsPath, Void> {
-    public WriteToGcsFn(long waitSec) {
-      this.waitSec = waitSec;
+  /** A thread that write to Gcs continuously. */
+  private static class WriteToPathContinuously extends Thread {
+    public WriteToPathContinuously(Path writePath, long interval) {
+      this.writePath = writePath;
+      this.interval = interval;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      GcsPath writePath = context.element();
-      assert writePath != null;
-      Thread writer =
-          new Thread(
-              () -> {
-                try {
-                  // Write test files to writePath
-                  Thread.sleep(waitSec * 1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 99);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 37);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  writeBytesToFile(writePath.resolve("third").toString(), 99);
-                } catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              });
-      writer.start();
+    @Override
+    public void run() {
+      int fileSize = 1;
+      // write a file at the beginning
+      writeBytesToFile(writePath.resolve("first").toString(), fileSize);
+
+      while (true) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          return;

Review Comment:
   Thanks. fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1157707316

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1155839709

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
kennknowles commented on code in PR #21877:
URL: https://github.com/apache/beam/pull/21877#discussion_r905361732


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -25,128 +25,171 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Date;
-import java.util.List;
-import org.apache.beam.runners.direct.DirectRunner;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class GcsMatchIT {
-  /** DoFn that writes test files to Gcs System when first time called. */
-  private static class WriteToGcsFn extends DoFn<GcsPath, Void> {
-    public WriteToGcsFn(long waitSec) {
-      this.waitSec = waitSec;
+  /** A thread that write to Gcs continuously. */
+  private static class WriteToPathContinuously extends Thread {
+    public WriteToPathContinuously(Path writePath, long interval) {
+      this.writePath = writePath;
+      this.interval = interval;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      GcsPath writePath = context.element();
-      assert writePath != null;
-      Thread writer =
-          new Thread(
-              () -> {
-                try {
-                  // Write test files to writePath
-                  Thread.sleep(waitSec * 1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 99);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 37);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  writeBytesToFile(writePath.resolve("third").toString(), 99);
-                } catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              });
-      writer.start();
+    @Override
+    public void run() {
+      int fileSize = 1;
+      // write a file at the beginning
+      writeBytesToFile(writePath.resolve("first").toString(), fileSize);
+
+      while (true) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          return;
+        }
+        // write another file continuously
+        writeBytesToFile(writePath.resolve("second").toString(), fileSize);
+        fileSize += 1;

Review Comment:
   You could also put a timeout on the `@Test` as a backstop, though you have to make it big enough that it will finish in any reasonable configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1155839713

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1156831237

   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21877:
URL: https://github.com/apache/beam/pull/21877#issuecomment-1156831079

   R: @johnjcasey 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org