You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/06 22:39:31 UTC
[pulsar] branch master updated: On shutdown,
kill the function instance thread after interrupt attempt (#3749)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 69faeaf On shutdown, kill the function instance thread after interrupt attempt (#3749)
69faeaf is described below
commit 69faeaf80a70947fca18e36d47cdf7540560d637
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Mar 6 14:39:26 2019 -0800
On shutdown, kill the function instance thread after interrupt attempt (#3749)
---
.../org/apache/pulsar/functions/runtime/ThreadRuntime.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index ad1002c..93246f2 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -44,6 +44,8 @@ class ThreadRuntime implements Runtime {
// The thread that invokes the function
private Thread fnThread;
+ private static final int THREAD_SHUTDOWN_TIMEOUT_MILLIS = 10_000;
+
@Getter
private InstanceConfig instanceConfig;
private JavaInstanceRunnable javaInstanceRunnable;
@@ -114,13 +116,19 @@ class ThreadRuntime implements Runtime {
}
}
+ @SuppressWarnings("deprecation")
@Override
public void stop() {
if (fnThread != null) {
// interrupt the instance thread
fnThread.interrupt();
try {
- fnThread.join();
+ // If the instance thread doesn't respond within some time, attempt to
+ // kill the thread
+ fnThread.join(THREAD_SHUTDOWN_TIMEOUT_MILLIS, 0);
+ if (fnThread.isAlive()) {
+ fnThread.stop();
+ }
} catch (InterruptedException e) {
// ignore this
}
@@ -152,8 +160,8 @@ class ThreadRuntime implements Runtime {
public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
return CompletableFuture.completedFuture(javaInstanceRunnable.getAndResetMetrics());
}
-
-
+
+
@Override
public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());