You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/01 01:56:43 UTC

[GitHub] [beam] TheNeuralBit opened a new pull request #13448: [BEAM-9187] Address flake in loadBalancesBundles

TheNeuralBit opened a new pull request #13448:
URL: https://github.com/apache/beam/pull/13448


   See my comment on BEAM-9187 for an explanation of what seems to have caused the flakes:
   
   > It seems that, in the error case, what's happening is as soon as the forked thread releases the semaphore [here](https://github.com/apache/beam/blob/5e9237cd425da92a98536ba598a6217c21b31286/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L560), the main thread continues, [creating the bundle](https://github.com/apache/beam/blob/5e9237cd425da92a98536ba598a6217c21b31286/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java#L478), and then making the assertions. However since we left the forked thread, it never got out of the b2.close() call and [set the boolean](https://github.com/apache/beam/blob/5e9237cd425da92a98536ba598a6217c21b31286/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java#L470).
   
   This PR addresses it by replacing the `TimerTask` and `AtomicBoolean` with a `ScheduledFuture`. We then make an assertion on the result of the scheduled future, including a small delay to make sure the main thread will block and give the forked thread a chance to run.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13448: [BEAM-9187] Address flake in loadBalancesBundles

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13448:
URL: https://github.com/apache/beam/pull/13448#discussion_r533049574



##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
##########
@@ -459,27 +461,28 @@ public void loadBalancesBundles() throws Exception {
       verify(envFactory, Mockito.times(2)).createEnvironment(eq(environment), any());
 
       long tms = System.currentTimeMillis();
-      AtomicBoolean closed = new AtomicBoolean();
-      // close to free up environment for another bundle
-      TimerTask closeBundleTask =
-          new TimerTask() {
-            @Override
-            public void run() {
-              try {
-                b2.close();
-                closed.set(true);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          };
-      new Timer().schedule(closeBundleTask, 100);
-
+      ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+      ScheduledFuture<Optional<Exception>> closingFuture =
+          executor.schedule(
+              () -> {
+                try {
+                  b2.close();
+                  return Optional.empty();
+                } catch (Exception e) {
+                  return Optional.of(e);
+                }
+              },
+              100,
+              TimeUnit.MILLISECONDS);
+
+      // This call should block until closingFuture has finished closing b2
       RemoteBundle b3 = sbf.getBundle(orf, srh, BundleProgressHandler.ignored());
 
-      // ensure we waited for close
+      // ensure the previous call waited for close
       Assert.assertThat(System.currentTimeMillis() - tms, greaterThanOrEqualTo(100L));
-      Assert.assertThat(closed.get(), is(true));
+      // This assertion includes a small delay to give the forked thread a chance to finish if it's
+      // been blocked
+      Assert.assertThat(closingFuture.get(1, TimeUnit.MILLISECONDS), equalTo(Optional.empty()));

Review comment:
       Yeah agree a small delay isn't ideal.. my concern with blocking indefinitely is that it could return much later and the test would still pass which wouldn't be correct. It _should_ be done.
   
   Maybe I'm just splitting hairs, the prior assertion on System time is pretty strong so we could just do without this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #13448: [BEAM-9187] Address flake in loadBalancesBundles

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #13448:
URL: https://github.com/apache/beam/pull/13448#discussion_r533024977



##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
##########
@@ -459,27 +461,28 @@ public void loadBalancesBundles() throws Exception {
       verify(envFactory, Mockito.times(2)).createEnvironment(eq(environment), any());
 
       long tms = System.currentTimeMillis();
-      AtomicBoolean closed = new AtomicBoolean();
-      // close to free up environment for another bundle
-      TimerTask closeBundleTask =
-          new TimerTask() {
-            @Override
-            public void run() {
-              try {
-                b2.close();
-                closed.set(true);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          };
-      new Timer().schedule(closeBundleTask, 100);
-
+      ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+      ScheduledFuture<Optional<Exception>> closingFuture =
+          executor.schedule(
+              () -> {
+                try {
+                  b2.close();
+                  return Optional.empty();
+                } catch (Exception e) {

Review comment:
       nit: You can drop this, `Future` will catch exceptions, wrap them in a `ExecutionException` and throw them out of `get`.

##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
##########
@@ -459,27 +461,28 @@ public void loadBalancesBundles() throws Exception {
       verify(envFactory, Mockito.times(2)).createEnvironment(eq(environment), any());
 
       long tms = System.currentTimeMillis();
-      AtomicBoolean closed = new AtomicBoolean();
-      // close to free up environment for another bundle
-      TimerTask closeBundleTask =
-          new TimerTask() {
-            @Override
-            public void run() {
-              try {
-                b2.close();
-                closed.set(true);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          };
-      new Timer().schedule(closeBundleTask, 100);
-
+      ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+      ScheduledFuture<Optional<Exception>> closingFuture =
+          executor.schedule(
+              () -> {
+                try {
+                  b2.close();
+                  return Optional.empty();
+                } catch (Exception e) {
+                  return Optional.of(e);
+                }
+              },
+              100,
+              TimeUnit.MILLISECONDS);
+
+      // This call should block until closingFuture has finished closing b2
       RemoteBundle b3 = sbf.getBundle(orf, srh, BundleProgressHandler.ignored());
 
-      // ensure we waited for close
+      // ensure the previous call waited for close
       Assert.assertThat(System.currentTimeMillis() - tms, greaterThanOrEqualTo(100L));
-      Assert.assertThat(closed.get(), is(true));
+      // This assertion includes a small delay to give the forked thread a chance to finish if it's
+      // been blocked
+      Assert.assertThat(closingFuture.get(1, TimeUnit.MILLISECONDS), equalTo(Optional.empty()));

Review comment:
       nit: Small delays lead to flakes, for example if the system is busy and doesn't get back to the forked thread. Does having a small timeout add any value here? Could you just block?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #13448: [BEAM-9187] Address flake in loadBalancesBundles

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #13448:
URL: https://github.com/apache/beam/pull/13448#discussion_r533606642



##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
##########
@@ -459,27 +461,28 @@ public void loadBalancesBundles() throws Exception {
       verify(envFactory, Mockito.times(2)).createEnvironment(eq(environment), any());
 
       long tms = System.currentTimeMillis();
-      AtomicBoolean closed = new AtomicBoolean();
-      // close to free up environment for another bundle
-      TimerTask closeBundleTask =
-          new TimerTask() {
-            @Override
-            public void run() {
-              try {
-                b2.close();
-                closed.set(true);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          };
-      new Timer().schedule(closeBundleTask, 100);
-
+      ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+      ScheduledFuture<Optional<Exception>> closingFuture =
+          executor.schedule(
+              () -> {
+                try {
+                  b2.close();
+                  return Optional.empty();
+                } catch (Exception e) {

Review comment:
       Yes only unchecked exceptions. It is odd to me that the Java `AutoCloseable` interface defines it as throwing `Exception`, but it does.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13448: [BEAM-9187] Address flake in loadBalancesBundles

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13448:
URL: https://github.com/apache/beam/pull/13448#discussion_r550345692



##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
##########
@@ -459,27 +461,28 @@ public void loadBalancesBundles() throws Exception {
       verify(envFactory, Mockito.times(2)).createEnvironment(eq(environment), any());
 
       long tms = System.currentTimeMillis();
-      AtomicBoolean closed = new AtomicBoolean();
-      // close to free up environment for another bundle
-      TimerTask closeBundleTask =
-          new TimerTask() {
-            @Override
-            public void run() {
-              try {
-                b2.close();
-                closed.set(true);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          };
-      new Timer().schedule(closeBundleTask, 100);
-
+      ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+      ScheduledFuture<Optional<Exception>> closingFuture =
+          executor.schedule(
+              () -> {
+                try {
+                  b2.close();
+                  return Optional.empty();
+                } catch (Exception e) {
+                  return Optional.of(e);
+                }
+              },
+              100,
+              TimeUnit.MILLISECONDS);
+
+      // This call should block until closingFuture has finished closing b2
       RemoteBundle b3 = sbf.getBundle(orf, srh, BundleProgressHandler.ignored());
 
-      // ensure we waited for close
+      // ensure the previous call waited for close
       Assert.assertThat(System.currentTimeMillis() - tms, greaterThanOrEqualTo(100L));
-      Assert.assertThat(closed.get(), is(true));
+      // This assertion includes a small delay to give the forked thread a chance to finish if it's
+      // been blocked
+      Assert.assertThat(closingFuture.get(1, TimeUnit.MILLISECONDS), equalTo(Optional.empty()));

Review comment:
       Ok I did similar to your (1), but rather than setting tms I just set an AtomicBoolean prior to calling close, and verify it is set right after getBundle returns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13448: [BEAM-9187] Address flake in loadBalancesBundles

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13448:
URL: https://github.com/apache/beam/pull/13448#discussion_r533048304



##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
##########
@@ -459,27 +461,28 @@ public void loadBalancesBundles() throws Exception {
       verify(envFactory, Mockito.times(2)).createEnvironment(eq(environment), any());
 
       long tms = System.currentTimeMillis();
-      AtomicBoolean closed = new AtomicBoolean();
-      // close to free up environment for another bundle
-      TimerTask closeBundleTask =
-          new TimerTask() {
-            @Override
-            public void run() {
-              try {
-                b2.close();
-                closed.set(true);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          };
-      new Timer().schedule(closeBundleTask, 100);
-
+      ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+      ScheduledFuture<Optional<Exception>> closingFuture =
+          executor.schedule(
+              () -> {
+                try {
+                  b2.close();
+                  return Optional.empty();
+                } catch (Exception e) {

Review comment:
       Hm could it be that Future only does that for unchecked exceptions? I had tried just making this `executor.schedule(b2::close, ..)` but it produces a compile error:
   ```
   incompatible thrown types Exception in method reference
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #13448: [BEAM-9187] Address flake in loadBalancesBundles

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #13448:
URL: https://github.com/apache/beam/pull/13448


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #13448: [BEAM-9187] Address flake in loadBalancesBundles

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #13448:
URL: https://github.com/apache/beam/pull/13448#discussion_r533633896



##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
##########
@@ -459,27 +461,28 @@ public void loadBalancesBundles() throws Exception {
       verify(envFactory, Mockito.times(2)).createEnvironment(eq(environment), any());
 
       long tms = System.currentTimeMillis();
-      AtomicBoolean closed = new AtomicBoolean();
-      // close to free up environment for another bundle
-      TimerTask closeBundleTask =
-          new TimerTask() {
-            @Override
-            public void run() {
-              try {
-                b2.close();
-                closed.set(true);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          };
-      new Timer().schedule(closeBundleTask, 100);
-
+      ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+      ScheduledFuture<Optional<Exception>> closingFuture =
+          executor.schedule(
+              () -> {
+                try {
+                  b2.close();
+                  return Optional.empty();
+                } catch (Exception e) {
+                  return Optional.of(e);
+                }
+              },
+              100,
+              TimeUnit.MILLISECONDS);
+
+      // This call should block until closingFuture has finished closing b2
       RemoteBundle b3 = sbf.getBundle(orf, srh, BundleProgressHandler.ignored());
 
-      // ensure we waited for close
+      // ensure the previous call waited for close
       Assert.assertThat(System.currentTimeMillis() - tms, greaterThanOrEqualTo(100L));
-      Assert.assertThat(closed.get(), is(true));
+      // This assertion includes a small delay to give the forked thread a chance to finish if it's
+      // been blocked
+      Assert.assertThat(closingFuture.get(1, TimeUnit.MILLISECONDS), equalTo(Optional.empty()));

Review comment:
       That does seem like a good reason to want a timeout.
   
   Here are some possible ways to tighten things down further:
   1. Set `tms` in the thread just before calling `close()`, then check `getBundle` returns after that time. (This guards against the system is running slow and the thread scheduling after significantly more than 100ms.)
   2. Make the delay of 100 ms random. (This with 1 guards against a `Thread.sleep(100L)` implementation of `getBundle`.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org