You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/06 22:39:31 UTC

[pulsar] branch master updated: On shutdown, kill the function instance thread after interrupt attempt (#3749)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 69faeaf  On shutdown, kill the function instance thread after interrupt attempt (#3749)
69faeaf is described below

commit 69faeaf80a70947fca18e36d47cdf7540560d637
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Mar 6 14:39:26 2019 -0800

    On shutdown, kill the function instance thread after interrupt attempt (#3749)
---
 .../org/apache/pulsar/functions/runtime/ThreadRuntime.java | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index ad1002c..93246f2 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -44,6 +44,8 @@ class ThreadRuntime implements Runtime {
     // The thread that invokes the function
     private Thread fnThread;
 
+    private static final int THREAD_SHUTDOWN_TIMEOUT_MILLIS = 10_000;
+
     @Getter
     private InstanceConfig instanceConfig;
     private JavaInstanceRunnable javaInstanceRunnable;
@@ -114,13 +116,19 @@ class ThreadRuntime implements Runtime {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void stop() {
         if (fnThread != null) {
             // interrupt the instance thread
             fnThread.interrupt();
             try {
-                fnThread.join();
+                // If the instance thread doesn't respond within some time, attempt to
+                // kill the thread
+                fnThread.join(THREAD_SHUTDOWN_TIMEOUT_MILLIS, 0);
+                if (fnThread.isAlive()) {
+                    fnThread.stop();
+                }
             } catch (InterruptedException e) {
                 // ignore this
             }
@@ -152,8 +160,8 @@ class ThreadRuntime implements Runtime {
     public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
         return CompletableFuture.completedFuture(javaInstanceRunnable.getAndResetMetrics());
     }
-    
-    
+
+
     @Override
     public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
         return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());