You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/28 23:23:34 UTC
[pulsar] branch master updated: Cleanup logic in
JavaInstanceRunnable close method (#3932)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5740699 Cleanup logic in JavaInstanceRunnable close method (#3932)
5740699 is described below
commit 5740699b834439651dc57c87edac2a3002202921
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Mar 28 16:23:30 2019 -0700
Cleanup logic in JavaInstanceRunnable close method (#3932)
* Cleanup logic in JavaInstanceRunnable close method
* Added comments
---
.../functions/instance/JavaInstanceRunnable.java | 25 ++++++++++++++++------
1 file changed, 18 insertions(+), 7 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e210eb6..8a60537 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -456,11 +456,16 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
return record;
}
+ /**
+ * NOTE: this method is be syncrhonized because it is potentially called by two different places
+ * one inside the run/finally clause and one inside the ThreadRuntime::stop
+ */
@Override
- public void close() {
+ synchronized public void close() {
if (stats != null) {
stats.close();
+ stats = null;
}
if (source != null) {
@@ -468,8 +473,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
source.close();
} catch (Throwable e) {
log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
-
}
+ source = null;
}
if (sink != null) {
@@ -478,10 +483,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
} catch (Throwable e) {
log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
}
+ sink = null;
}
if (null != javaInstance) {
javaInstance.close();
+ javaInstance = null;
}
// kill the state table
@@ -495,13 +502,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
log.warn("Failed to close state storage client", cause);
return null;
});
+ storageClient = null;
}
- // once the thread quits, clean up the instance
- fnCache.unregisterFunctionInstance(
- instanceConfig.getFunctionId(),
- instanceConfig.getInstanceName());
- log.info("Unloading JAR files for function {}", instanceConfig);
+ if (instanceCache != null) {
+ // once the thread quits, clean up the instance
+ fnCache.unregisterFunctionInstance(
+ instanceConfig.getFunctionId(),
+ instanceConfig.getInstanceName());
+ log.info("Unloading JAR files for function {}", instanceConfig);
+ instanceCache = null;
+ }
}
public InstanceCommunication.MetricsData getAndResetMetrics() {