You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/23 18:57:50 UTC

[GitHub] [beam] kennknowles commented on a diff in pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

kennknowles commented on code in PR #21877:
URL: https://github.com/apache/beam/pull/21877#discussion_r905361732


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -25,128 +25,171 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Date;
-import java.util.List;
-import org.apache.beam.runners.direct.DirectRunner;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class GcsMatchIT {
-  /** DoFn that writes test files to Gcs System when first time called. */
-  private static class WriteToGcsFn extends DoFn<GcsPath, Void> {
-    public WriteToGcsFn(long waitSec) {
-      this.waitSec = waitSec;
+  /** A thread that write to Gcs continuously. */
+  private static class WriteToPathContinuously extends Thread {
+    public WriteToPathContinuously(Path writePath, long interval) {
+      this.writePath = writePath;
+      this.interval = interval;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      GcsPath writePath = context.element();
-      assert writePath != null;
-      Thread writer =
-          new Thread(
-              () -> {
-                try {
-                  // Write test files to writePath
-                  Thread.sleep(waitSec * 1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 99);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 37);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  writeBytesToFile(writePath.resolve("third").toString(), 99);
-                } catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              });
-      writer.start();
+    @Override
+    public void run() {
+      int fileSize = 1;
+      // write a file at the beginning
+      writeBytesToFile(writePath.resolve("first").toString(), fileSize);
+
+      while (true) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          return;
+        }
+        // write another file continuously
+        writeBytesToFile(writePath.resolve("second").toString(), fileSize);
+        fileSize += 1;

Review Comment:
   You could also put a timeout on the `@Test` as a backstop, though you have to make it big enough that it will finish in any reasonable configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org