You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/05 16:00:52 UTC
[pulsar] branch master updated: [function][runtime] NPE at
RuntimeSpawner (#2728)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ea503da [function][runtime] NPE at RuntimeSpawner (#2728)
ea503da is described below
commit ea503da4b279d04de13ed676291e1b943f9f973f
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Fri Oct 5 09:00:39 2018 -0700
[function][runtime] NPE at RuntimeSpawner (#2728)
*Motivation*
getFunctionStat call can happen before spanwer starts the runtime
*Changes*
Add null checks and also change the order of closing sequence in RuntimeSpawner
---
.../apache/pulsar/functions/runtime/RuntimeSpawner.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 423ccd9..5688411 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -32,6 +32,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -82,7 +83,8 @@ public class RuntimeSpawner implements AutoCloseable {
processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
- if (!runtime.isAlive()) {
+ Runtime runtime = RuntimeSpawner.this.runtime;
+ if (runtime != null && !runtime.isAlive()) {
log.error("{}/{}/{}-{} Function Container is dead with exception.. restarting", details.getTenant(),
details.getNamespace(), details.getName(), runtime.getDeathException());
// Just for the sake of sanity, just destroy the runtime
@@ -108,6 +110,10 @@ public class RuntimeSpawner implements AutoCloseable {
}
public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
+ Runtime runtime = this.runtime;
+ if (null == runtime) {
+ return FutureUtil.failedFuture(new IllegalStateException("Function runtime is not started yet"));
+ }
return runtime.getFunctionStatus(instanceId).thenApply(f -> {
FunctionStatus.Builder builder = FunctionStatus.newBuilder();
builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(String.valueOf(instanceId));
@@ -131,6 +137,11 @@ public class RuntimeSpawner implements AutoCloseable {
@Override
public void close() {
+ // cancel liveness checker before stopping runtime.
+ if (processLivenessCheckTimer != null) {
+ processLivenessCheckTimer.cancel();
+ processLivenessCheckTimer = null;
+ }
if (null != runtime) {
try {
runtime.stop();
@@ -139,9 +150,5 @@ public class RuntimeSpawner implements AutoCloseable {
}
runtime = null;
}
- if (processLivenessCheckTimer != null) {
- processLivenessCheckTimer.cancel();
- processLivenessCheckTimer = null;
- }
}
}