You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/09 20:19:04 UTC

[GitHub] [beam] mxm opened a new pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

mxm opened a new pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084
 
 
   The cleanup code in DefaultJobBundleFactory and its RemoteEnvironments may leak
   resources. This is especially a concern when the execution engines reuses the
   same JVM or underlying machines for multiple runs of a pipeline.
   
   Exceptions encountered during cleanup should not lead to aborting the cleanup
   procedure. Not all code handles this correctly. We should also ensure that the
   cleanup succeeds even if the runner does not properly close the bundle,
   e.g. when a exception occurs during closing the bundle.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391330585
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
       LOG.debug("Attempting to stop process with id {}", id);
       // first try to kill gracefully
       process.destroy();
-      long maxTimeToWait = 2000;
-      if (waitForProcessToDie(process, maxTimeToWait)) {
-        LOG.debug("Process for worker {} shut down gracefully.", id);
-      } else {
-        LOG.info("Process for worker {} still running. Killing.", id);
-        process.destroyForcibly();
+      long maxTimeToWait = 500;
 
 Review comment:
   Won't delay the PR for it!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389998961
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
       LOG.debug("Attempting to stop process with id {}", id);
       // first try to kill gracefully
       process.destroy();
-      long maxTimeToWait = 2000;
-      if (waitForProcessToDie(process, maxTimeToWait)) {
-        LOG.debug("Process for worker {} shut down gracefully.", id);
-      } else {
-        LOG.info("Process for worker {} still running. Killing.", id);
-        process.destroyForcibly();
+      long maxTimeToWait = 500;
 
 Review comment:
   The old timeout was too long.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390483917
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -375,6 +428,11 @@ public RemoteBundle getBundle(
         }
       }
 
+      // Ensure client is referenced for this bundle, unref in close()
+      client.ref();
+      // Cleanup list of clients which were active during eviction but now do not hold references
+      evictedActiveClients.removeIf(c -> c.bundleRefCount.get() <= 0);
 
 Review comment:
   This is a cheap operation. I don't think we can move it because we need to run this cleanup for all branches.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391225776
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
 Review comment:
   It's not true that a later `ref()` introduces a bug for `preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);` because the refcount will be >0, otherwise we wouldn't be able to retrieve the client from the cache.
   
   In any case, I will revert this change. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390477363
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -375,6 +428,11 @@ public RemoteBundle getBundle(
         }
       }
 
+      // Ensure client is referenced for this bundle, unref in close()
+      client.ref();
+      // Cleanup list of clients which were active during eviction but now do not hold references
+      evictedActiveClients.removeIf(c -> c.bundleRefCount.get() <= 0);
 
 Review comment:
   I'm not sure I like this in the path for every bundle. We could probably move it below line 416 (`preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);`) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390093798
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -166,11 +168,20 @@ public static DefaultJobBundleFactory create(
         CacheBuilder.newBuilder()
             .removalListener(
                 (RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
-                  int refCount = notification.getValue().unref();
-                  LOG.debug(
-                      "Removed environment {} with {} remaining bundle references.",
-                      notification.getKey(),
-                      refCount);
+                  WrappedSdkHarnessClient client = notification.getValue();
 
 Review comment:
   The correct solution might be to enforce this from close, not from the cache removal listener.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390477875
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
         currentClient = preparedClients.get(client);
         if (currentClient == null) {
           // we are using this client for the first time
           preparedClients.put(client, currentClient = prepare(client, executableStage));
           // cleanup any expired clients
-          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() == 0);
+          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);
 
 Review comment:
   Why the change from `==` to `<=`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391226903
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
         currentClient = preparedClients.get(client);
         if (currentClient == null) {
           // we are using this client for the first time
           preparedClients.put(client, currentClient = prepare(client, executableStage));
           // cleanup any expired clients
-          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() == 0);
+          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);
 
 Review comment:
   I'll also revert this change to not distract from the main objective of this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390011274
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -166,11 +168,20 @@ public static DefaultJobBundleFactory create(
         CacheBuilder.newBuilder()
             .removalListener(
                 (RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
-                  int refCount = notification.getValue().unref();
-                  LOG.debug(
-                      "Removed environment {} with {} remaining bundle references.",
-                      notification.getKey(),
-                      refCount);
+                  WrappedSdkHarnessClient client = notification.getValue();
 
 Review comment:
   I've removed the reference counting, PTAL.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389999133
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -252,14 +263,47 @@ public StageBundleFactory forStage(ExecutableStage executableStage) {
   }
 
   @Override
-  public void close() throws Exception {
-    // Clear the cache. This closes all active environments.
-    // note this may cause open calls to be cancelled by the peer
-    for (LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache : environmentCaches) {
-      environmentCache.invalidateAll();
-      environmentCache.cleanUp();
+  public synchronized void close() throws Exception {
+    if (closed) {
+      return;
+    }
+    Exception exception = null;
 
 Review comment:
   Good point.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391210906
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
       LOG.debug("Attempting to stop process with id {}", id);
       // first try to kill gracefully
       process.destroy();
-      long maxTimeToWait = 2000;
-      if (waitForProcessToDie(process, maxTimeToWait)) {
-        LOG.debug("Process for worker {} shut down gracefully.", id);
-      } else {
-        LOG.info("Process for worker {} still running. Killing.", id);
-        process.destroyForcibly();
+      long maxTimeToWait = 500;
 
 Review comment:
   If you have verified that the graceful shutdown works (in the happy path), then we are good. Maybe add a comment to the code, since all of this isn't very obvious.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389982978
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -252,14 +263,47 @@ public StageBundleFactory forStage(ExecutableStage executableStage) {
   }
 
   @Override
-  public void close() throws Exception {
-    // Clear the cache. This closes all active environments.
-    // note this may cause open calls to be cancelled by the peer
-    for (LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache : environmentCaches) {
-      environmentCache.invalidateAll();
-      environmentCache.cleanUp();
+  public synchronized void close() throws Exception {
+    if (closed) {
+      return;
+    }
+    Exception exception = null;
+    try {
+      for (LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache :
+          environmentCaches) {
+        try {
+          // Clear the cache. This closes all active environments.
+          // note this may cause open calls to be cancelled by the peer
+          environmentCache.invalidateAll();
+          environmentCache.cleanUp();
+        } catch (Exception e) {
+          if (exception != null) {
+            exception.addSuppressed(e);
+          } else {
+            exception = e;
+          }
+        }
+      }
+      try {
+        executor.shutdown();
+      } catch (Exception e) {
+        if (exception != null) {
+          exception.addSuppressed(e);
+        } else {
+          exception = e;
+        }
+      }
+    } catch (Exception e) {
 
 Review comment:
   The outer try/catch isn't necessary since you already have it nested.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389998500
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -252,14 +263,47 @@ public StageBundleFactory forStage(ExecutableStage executableStage) {
   }
 
   @Override
-  public void close() throws Exception {
-    // Clear the cache. This closes all active environments.
-    // note this may cause open calls to be cancelled by the peer
-    for (LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache : environmentCaches) {
-      environmentCache.invalidateAll();
-      environmentCache.cleanUp();
+  public synchronized void close() throws Exception {
+    if (closed) {
+      return;
+    }
+    Exception exception = null;
+    try {
+      for (LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache :
+          environmentCaches) {
+        try {
+          // Clear the cache. This closes all active environments.
+          // note this may cause open calls to be cancelled by the peer
+          environmentCache.invalidateAll();
+          environmentCache.cleanUp();
+        } catch (Exception e) {
+          if (exception != null) {
+            exception.addSuppressed(e);
+          } else {
+            exception = e;
+          }
+        }
+      }
+      try {
+        executor.shutdown();
+      } catch (Exception e) {
+        if (exception != null) {
+          exception.addSuppressed(e);
+        } else {
+          exception = e;
+        }
+      }
+    } catch (Exception e) {
 
 Review comment:
   Not strictly, but the iterator could throw. I'm trying to be extra defensive here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on issue #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#issuecomment-596767134
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390093216
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -166,11 +168,20 @@ public static DefaultJobBundleFactory create(
         CacheBuilder.newBuilder()
             .removalListener(
                 (RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
-                  int refCount = notification.getValue().unref();
-                  LOG.debug(
-                      "Removed environment {} with {} remaining bundle references.",
-                      notification.getKey(),
-                      refCount);
+                  WrappedSdkHarnessClient client = notification.getValue();
 
 Review comment:
   As stated above, reference counting is required for environment expiration. The environment can only be closed when all bundles that reference it have finished. I would prefer we limit this PR scope strictly to the original purpose and discuss other changes separately.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391247459
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
       LOG.debug("Attempting to stop process with id {}", id);
       // first try to kill gracefully
       process.destroy();
-      long maxTimeToWait = 2000;
-      if (waitForProcessToDie(process, maxTimeToWait)) {
-        LOG.debug("Process for worker {} shut down gracefully.", id);
-      } else {
-        LOG.info("Process for worker {} still running. Killing.", id);
-        process.destroyForcibly();
+      long maxTimeToWait = 500;
 
 Review comment:
   It's not always shutting down gracefully but that's what the change is about: removing processes and ensuring a quick recovery time. It's a trade-off. Ideally we would want to allow more time but if we wait 2 seconds with an SDK parallelism of 16, that's already more than half a minute waiting time. We really want to do the process removal in parallel. I'll look into this.
   
   I'm not sure the ProcessManager is a good place to document the shutdown behavior. If you have any suggestions though, I'll add them here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389980853
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java
 ##########
 @@ -64,13 +63,32 @@ public InstructionRequestHandler getInstructionRequestHandler() {
   }
 
   @Override
-  public void close() throws Exception {
-    synchronized (lock) {
-      if (!isClosed) {
-        instructionHandler.close();
-        processManager.stopProcess(workerId);
-        isClosed = true;
+  public synchronized void close() throws Exception {
+    if (isClosed) {
+      return;
+    }
+    Exception exception = null;
+    try {
+      processManager.stopProcess(workerId);
+    } catch (Exception e) {
+      if (exception != null) {
 
 Review comment:
   Since `exception` is null, this should just be `exception = e`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391207665
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
 Review comment:
   Below, `preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);` removes everything that isn't referenced. These two statements are logically one unit and hence I prefer to not scatter them:
   ```
   client = currentCache.getUnchecked(executableStage.getEnvironment());
           client.ref();
   client.ref();
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389992620
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -252,14 +263,47 @@ public StageBundleFactory forStage(ExecutableStage executableStage) {
   }
 
   @Override
-  public void close() throws Exception {
-    // Clear the cache. This closes all active environments.
-    // note this may cause open calls to be cancelled by the peer
-    for (LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache : environmentCaches) {
-      environmentCache.invalidateAll();
-      environmentCache.cleanUp();
+  public synchronized void close() throws Exception {
+    if (closed) {
+      return;
+    }
+    Exception exception = null;
 
 Review comment:
   It may be good to add an explanation here for the following elaborate cleanup logic as otherwise there is the danger it will be "simplified" in the future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391256990
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
 Review comment:
   Yes, that makes sense. I've already reverted the change.
   
   I suppose there is a race condition where we retrieve an environment X and before we can call `ref()` on it, we evict the environment X, close all its references, and shut it down. This will result in a job restart.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389988716
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -166,11 +168,20 @@ public static DefaultJobBundleFactory create(
         CacheBuilder.newBuilder()
             .removalListener(
                 (RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
-                  int refCount = notification.getValue().unref();
-                  LOG.debug(
-                      "Removed environment {} with {} remaining bundle references.",
-                      notification.getKey(),
-                      refCount);
+                  WrappedSdkHarnessClient client = notification.getValue();
 
 Review comment:
   Having this as part of the removal listener would prematurely close an environment that is still referenced. The purpose of the refcount is to be able to remove the environment from the cache when it expires but close it only after all references are gone. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390963992
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
 Review comment:
   I checked again and couldn't see how this change alters the behavior. In any case, I don't mind to move it back to unblock this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391209772
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
         currentClient = preparedClients.get(client);
         if (currentClient == null) {
           // we are using this client for the first time
           preparedClients.put(client, currentClient = prepare(client, executableStage));
           // cleanup any expired clients
-          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() == 0);
+          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);
 
 Review comment:
   On the flip side, this could hide a bug in the reference counting. That could be covered with tests though. Not important for this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390966261
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
       LOG.debug("Attempting to stop process with id {}", id);
       // first try to kill gracefully
       process.destroy();
-      long maxTimeToWait = 2000;
-      if (waitForProcessToDie(process, maxTimeToWait)) {
-        LOG.debug("Process for worker {} shut down gracefully.", id);
-      } else {
-        LOG.info("Process for worker {} still running. Killing.", id);
-        process.destroyForcibly();
+      long maxTimeToWait = 500;
 
 Review comment:
   If you want I can restore the old timeout but I would then also change the code to make the stopping async or at least stop all the processes at once and then wait (instead of tearing down the process one-by-one and wait for each process to quit).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390475089
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
 Review comment:
   `client.ref` needs to remain here, the lines below rely on that and it is also more readable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391257741
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -487,13 +553,14 @@ private int ref() {
     }
 
     private int unref() {
-      int count = bundleRefCount.decrementAndGet();
-      if (count == 0) {
+      int refCount = bundleRefCount.decrementAndGet();
+      Preconditions.checkState(refCount >= 0, "Reference count must not be negative.");
 
 Review comment:
   FYI, I've added this check instead to check for correct bounds.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390482190
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
 Review comment:
   Please explain which lines rely on it.
   
   Readability is highly highly subjective, I find it more readable to not duplicate the same code for two branches, as it is currently the case (which is potentially error-prone).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390966261
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
       LOG.debug("Attempting to stop process with id {}", id);
       // first try to kill gracefully
       process.destroy();
-      long maxTimeToWait = 2000;
-      if (waitForProcessToDie(process, maxTimeToWait)) {
-        LOG.debug("Process for worker {} shut down gracefully.", id);
-      } else {
-        LOG.info("Process for worker {} still running. Killing.", id);
-        process.destroyForcibly();
+      long maxTimeToWait = 500;
 
 Review comment:
   If you want I can restore the old timeout but I would then also change the code to make the stopping async or at least stop all the processes at once and then wait (instead of tearing down the process one-by-one).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389991139
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -234,9 +231,7 @@ public void run() {
               // Graceful shutdown period
               Thread.sleep(200);
               break;
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              throw new RuntimeException(e);
+            } catch (InterruptedException ignored) {
 
 Review comment:
   Add comment on why the exception is ignored.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390961590
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
       LOG.debug("Attempting to stop process with id {}", id);
       // first try to kill gracefully
       process.destroy();
-      long maxTimeToWait = 2000;
-      if (waitForProcessToDie(process, maxTimeToWait)) {
-        LOG.debug("Process for worker {} shut down gracefully.", id);
-      } else {
-        LOG.info("Process for worker {} still running. Killing.", id);
-        process.destroyForcibly();
+      long maxTimeToWait = 500;
 
 Review comment:
   The shutdown code is synchronous, so I'd prefer a shorter timeout here. It would be a good improvement to make it async.
   
   I've tested this on a cluster with many runs and I've not seen a single instance lingering. Also I did not notice any difference in the logs. The environment will be torn down last, after all connections have been closed. So failures would not be visible anymore.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390235549
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -166,11 +168,20 @@ public static DefaultJobBundleFactory create(
         CacheBuilder.newBuilder()
             .removalListener(
                 (RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
-                  int refCount = notification.getValue().unref();
-                  LOG.debug(
-                      "Removed environment {} with {} remaining bundle references.",
-                      notification.getKey(),
-                      refCount);
+                  WrappedSdkHarnessClient client = notification.getValue();
 
 Review comment:
   You are right that we can't close the environment during removal due to pending references hold by still-processing bundles. Please take a look at the follow-up. I'm now quarantining clients which still hold references, to remove them during shutdown if they have not been dereferenced before.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391235877
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
 Review comment:
   > It's not true that a later `ref()` introduces a bug for `preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);` because the refcount will be >0, otherwise we wouldn't be able to retrieve the client from the cache.
   
   Cache and environment are shared between executable stages. So the refcount can become 0 with concurrent eviction and release. That actually raises the question if these 2 statements should be atomic.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389984708
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -166,11 +168,20 @@ public static DefaultJobBundleFactory create(
         CacheBuilder.newBuilder()
             .removalListener(
                 (RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
-                  int refCount = notification.getValue().unref();
-                  LOG.debug(
-                      "Removed environment {} with {} remaining bundle references.",
-                      notification.getKey(),
-                      refCount);
+                  WrappedSdkHarnessClient client = notification.getValue();
+                  int refCount = client.unref();
+                  // Double-check to trigger closing of all environments in case the "refing" does
+                  // not clean them up during operator shutdown. This is necessary in some
+                  // situations, e.g when the bundle cannot be closed and thus the ref cannot be
+                  // released. All environment types ensure they can only be closed once.
+                  if (refCount > 0) {
+                    LOG.warn(
+                        "Clearing remaining {} bundle references from environment {} to ensure it shuts down.",
+                        refCount,
+                        notification.getKey());
+                    //noinspection StatementWithEmptyBody
 
 Review comment:
   Why is this needed (with the log statement above)? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on issue #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#issuecomment-597629390
 
 
   Thanks for your comments @tweise. Let me know if you have more questions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389990103
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
       LOG.debug("Attempting to stop process with id {}", id);
       // first try to kill gracefully
       process.destroy();
-      long maxTimeToWait = 2000;
-      if (waitForProcessToDie(process, maxTimeToWait)) {
-        LOG.debug("Process for worker {} shut down gracefully.", id);
-      } else {
-        LOG.info("Process for worker {} still running. Killing.", id);
-        process.destroyForcibly();
+      long maxTimeToWait = 500;
 
 Review comment:
   Why the timeout change?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on issue #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#issuecomment-596823759
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390480166
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
       LOG.debug("Attempting to stop process with id {}", id);
       // first try to kill gracefully
       process.destroy();
-      long maxTimeToWait = 2000;
-      if (waitForProcessToDie(process, maxTimeToWait)) {
-        LOG.debug("Process for worker {} shut down gracefully.", id);
-      } else {
-        LOG.info("Process for worker {} still running. Killing.", id);
-        process.destroyForcibly();
+      long maxTimeToWait = 500;
 
 Review comment:
   How did that manifest? 500ms timeout for process termination seems too aggressive. I would prefer a longer timeout to allow for connections to be closed gracefully.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389998456
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java
 ##########
 @@ -64,13 +63,32 @@ public InstructionRequestHandler getInstructionRequestHandler() {
   }
 
   @Override
-  public void close() throws Exception {
-    synchronized (lock) {
-      if (!isClosed) {
-        instructionHandler.close();
-        processManager.stopProcess(workerId);
-        isClosed = true;
+  public synchronized void close() throws Exception {
+    if (isClosed) {
+      return;
+    }
+    Exception exception = null;
+    try {
+      processManager.stopProcess(workerId);
+    } catch (Exception e) {
+      if (exception != null) {
 
 Review comment:
   I intentionally didn't do that. If you move around the the try block to change the order, this will break. I'd prefer to be defensive.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391242142
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
         currentClient = preparedClients.get(client);
         if (currentClient == null) {
           // we are using this client for the first time
           preparedClients.put(client, currentClient = prepare(client, executableStage));
           // cleanup any expired clients
-          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() == 0);
+          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);
 
 Review comment:
   Not important, can stay as is. What would be good to check is that the environment expiration still works.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389998590
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -166,11 +168,20 @@ public static DefaultJobBundleFactory create(
         CacheBuilder.newBuilder()
             .removalListener(
                 (RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
-                  int refCount = notification.getValue().unref();
-                  LOG.debug(
-                      "Removed environment {} with {} remaining bundle references.",
-                      notification.getKey(),
-                      refCount);
+                  WrappedSdkHarnessClient client = notification.getValue();
+                  int refCount = client.unref();
+                  // Double-check to trigger closing of all environments in case the "refing" does
+                  // not clean them up during operator shutdown. This is necessary in some
+                  // situations, e.g when the bundle cannot be closed and thus the ref cannot be
+                  // released. All environment types ensure they can only be closed once.
+                  if (refCount > 0) {
+                    LOG.warn(
+                        "Clearing remaining {} bundle references from environment {} to ensure it shuts down.",
+                        refCount,
+                        notification.getKey());
+                    //noinspection StatementWithEmptyBody
 
 Review comment:
   Just to inform when there were still references. The comment is to suppress a warning on the while loop which counts down the refs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390483041
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -352,20 +407,18 @@ public RemoteBundle getBundle(
         // The blocking queue of caches for serving multiple bundles concurrently.
         currentCache = availableCaches.take();
         client = currentCache.getUnchecked(executableStage.getEnvironment());
-        client.ref();
 
         currentClient = preparedClients.get(client);
         if (currentClient == null) {
           // we are using this client for the first time
           preparedClients.put(client, currentClient = prepare(client, executableStage));
           // cleanup any expired clients
-          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() == 0);
+          preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);
 
 Review comment:
   I prefer including the entire range whenever I write such checks. Just a way to guard against future changes where we might countdown after the value has reached zero.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r391247675
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -375,6 +428,11 @@ public RemoteBundle getBundle(
         }
       }
 
+      // Ensure client is referenced for this bundle, unref in close()
+      client.ref();
+      // Cleanup list of clients which were active during eviction but now do not hold references
+      evictedActiveClients.removeIf(c -> c.bundleRefCount.get() <= 0);
 
 Review comment:
   I've improved this to only remove when environment expiration is used.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r389999107
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
 ##########
 @@ -234,9 +231,7 @@ public void run() {
               // Graceful shutdown period
               Thread.sleep(200);
               break;
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              throw new RuntimeException(e);
+            } catch (InterruptedException ignored) {
 
 Review comment:
   Will do. Ignoring it because there is not point to throw it here, we want to shutdown the processes after the grace period expired here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084#discussion_r390003729
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##########
 @@ -166,11 +168,20 @@ public static DefaultJobBundleFactory create(
         CacheBuilder.newBuilder()
             .removalListener(
                 (RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
-                  int refCount = notification.getValue().unref();
-                  LOG.debug(
-                      "Removed environment {} with {} remaining bundle references.",
-                      notification.getKey(),
-                      refCount);
+                  WrappedSdkHarnessClient client = notification.getValue();
 
 Review comment:
   It doesn't work though if we do not ensure dereferencing under all circumstances. We need a safeguard here, also considering other runners may not dereference correctly. Generally, it is hard to guarantee dereferencing due to the nesting of DoFnRunners which may not even allow closing the bundle in error cases. I considered not doing this but I think it is the safer route.
   
   If you take a step back, when would the reference counting really be useful? Every restarted job will anyways run in a new classloader, so the environment will never be recycled. When we call close we should tear down everything. 
   
   Taking back another step, the reference counting should really be removed entirely. It was error prone from the beginning leading to subtle problems with dereferencing. If you don't mind, I'd remove it. What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] tweise merged pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment

Posted by GitBox <gi...@apache.org>.
tweise merged pull request #11084: [BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
URL: https://github.com/apache/beam/pull/11084
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services