You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/15 20:12:16 UTC

[GitHub] [beam] Abacn commented on a diff in pull request #21877: Followup fix FileIOTest.testMatchWatchForNewFiles flaky

Abacn commented on code in PR #21877:
URL: https://github.com/apache/beam/pull/21877#discussion_r898376736


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -25,128 +25,171 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Date;
-import java.util.List;
-import org.apache.beam.runners.direct.DirectRunner;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class GcsMatchIT {
-  /** DoFn that writes test files to Gcs System when first time called. */
-  private static class WriteToGcsFn extends DoFn<GcsPath, Void> {
-    public WriteToGcsFn(long waitSec) {
-      this.waitSec = waitSec;
+  /** A thread that write to Gcs continuously. */
+  private static class WriteToPathContinuously extends Thread {
+    public WriteToPathContinuously(Path writePath, long interval) {
+      this.writePath = writePath;
+      this.interval = interval;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      GcsPath writePath = context.element();
-      assert writePath != null;
-      Thread writer =
-          new Thread(
-              () -> {
-                try {
-                  // Write test files to writePath
-                  Thread.sleep(waitSec * 1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 99);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 37);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  writeBytesToFile(writePath.resolve("third").toString(), 99);
-                } catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              });
-      writer.start();
+    @Override
+    public void run() {
+      int fileSize = 1;
+      // write a file at the beginning
+      writeBytesToFile(writePath.resolve("first").toString(), fileSize);
+
+      while (true) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          return;
+        }
+        // write another file continuously
+        writeBytesToFile(writePath.resolve("second").toString(), fileSize);
+        fileSize += 1;

Review Comment:
   added `!Thread.interrupted()` which should be used here, and also set a max file size.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -25,128 +25,171 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Date;
-import java.util.List;
-import org.apache.beam.runners.direct.DirectRunner;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class GcsMatchIT {
-  /** DoFn that writes test files to Gcs System when first time called. */
-  private static class WriteToGcsFn extends DoFn<GcsPath, Void> {
-    public WriteToGcsFn(long waitSec) {
-      this.waitSec = waitSec;
+  /** A thread that write to Gcs continuously. */
+  private static class WriteToPathContinuously extends Thread {
+    public WriteToPathContinuously(Path writePath, long interval) {
+      this.writePath = writePath;
+      this.interval = interval;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      GcsPath writePath = context.element();
-      assert writePath != null;
-      Thread writer =
-          new Thread(
-              () -> {
-                try {
-                  // Write test files to writePath
-                  Thread.sleep(waitSec * 1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 99);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  Thread.sleep(1000);
-                  writeBytesToFile(writePath.resolve("first").toString(), 37);
-                  writeBytesToFile(writePath.resolve("second").toString(), 42);
-                  writeBytesToFile(writePath.resolve("third").toString(), 99);
-                } catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              });
-      writer.start();
+    @Override
+    public void run() {
+      int fileSize = 1;
+      // write a file at the beginning
+      writeBytesToFile(writePath.resolve("first").toString(), fileSize);
+
+      while (true) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          return;

Review Comment:
   Thanks. fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org