You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/05 16:00:52 UTC

[pulsar] branch master updated: [function][runtime] NPE at RuntimeSpawner (#2728)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ea503da  [function][runtime] NPE at RuntimeSpawner (#2728)
ea503da is described below

commit ea503da4b279d04de13ed676291e1b943f9f973f
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Fri Oct 5 09:00:39 2018 -0700

    [function][runtime] NPE at RuntimeSpawner (#2728)
    
    *Motivation*
    
    getFunctionStat call can happen before spanwer starts the runtime
    
    *Changes*
    
    Add null checks and also change the order of closing sequence in RuntimeSpawner
---
 .../apache/pulsar/functions/runtime/RuntimeSpawner.java | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 423ccd9..5688411 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -32,6 +32,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -82,7 +83,8 @@ public class RuntimeSpawner implements AutoCloseable {
             processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
                 @Override
                 public void run() {
-                    if (!runtime.isAlive()) {
+                    Runtime runtime = RuntimeSpawner.this.runtime;
+                    if (runtime != null && !runtime.isAlive()) {
                         log.error("{}/{}/{}-{} Function Container is dead with exception.. restarting", details.getTenant(),
                                 details.getNamespace(), details.getName(), runtime.getDeathException());
                         // Just for the sake of sanity, just destroy the runtime
@@ -108,6 +110,10 @@ public class RuntimeSpawner implements AutoCloseable {
     }
 
     public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
+        Runtime runtime = this.runtime;
+        if (null == runtime) {
+            return FutureUtil.failedFuture(new IllegalStateException("Function runtime is not started yet"));
+        }
         return runtime.getFunctionStatus(instanceId).thenApply(f -> {
            FunctionStatus.Builder builder = FunctionStatus.newBuilder();
            builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(String.valueOf(instanceId));
@@ -131,6 +137,11 @@ public class RuntimeSpawner implements AutoCloseable {
 
     @Override
     public void close() {
+        // cancel liveness checker before stopping runtime.
+        if (processLivenessCheckTimer != null) {
+            processLivenessCheckTimer.cancel();
+            processLivenessCheckTimer = null;
+        }
         if (null != runtime) {
             try {
                 runtime.stop();
@@ -139,9 +150,5 @@ public class RuntimeSpawner implements AutoCloseable {
             }
             runtime = null;
         }
-        if (processLivenessCheckTimer != null) {
-            processLivenessCheckTimer.cancel();
-            processLivenessCheckTimer = null;
-        }
     }
 }