You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/22 18:32:29 UTC

[pulsar] branch master updated: Fix null pointer when getting function instance metrics. (#7010)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 20320c7  Fix null pointer when getting function instance metrics. (#7010)
20320c7 is described below

commit 20320c7ca15498f5655668e99cce286be381c76d
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 22 11:32:17 2020 -0700

    Fix null pointer when getting function instance metrics. (#7010)
    
    * Fix null pointer when getting function instance metrics.
    
    * Made more functions sync
    
    * Made the remaining public interface synchronized
    
    * Made stats class public method synchronized so they are thread safe.
    
    * Made setup synchronized so that it and close won't run together
    
    * Undo making stats sync until we resolve differences
    
    * Incorporated feedback
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../functions/instance/JavaInstanceRunnable.java   | 75 ++++++++++++++--------
 .../functions/runtime/thread/ThreadRuntime.java    |  2 +-
 2 files changed, 51 insertions(+), 26 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 0fa4fc6..892c7e5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,6 +24,8 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
+
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -102,9 +104,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     // provide tables for storing states
     private final String stateStorageServiceUrl;
-    @Getter(AccessLevel.PACKAGE)
     private StorageClient storageClient;
-    @Getter(AccessLevel.PACKAGE)
     private Table<ByteBuf, ByteBuf> stateTable;
 
     private JavaInstance javaInstance;
@@ -112,7 +112,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     private Throwable deathException;
 
     // function stats
-    @Getter
     private ComponentStatsManager stats;
 
     private Record<?> currentRecord;
@@ -178,7 +177,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     /**
      * NOTE: this method should be called in the instance thread, in order to make class loading work.
      */
-    JavaInstance setupJavaInstance() throws Exception {
+    synchronized private void setup() throws Exception {
+
+        this.instanceCache = InstanceCache.getInstanceCache();
+
+        if (this.collectorRegistry == null) {
+            this.collectorRegistry = new CollectorRegistry();
+        }
+        this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
+                this.instanceCache.getScheduledExecutorService(),
+                this.componentType);
+
         // initialize the thread context
         ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
         ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
@@ -218,7 +227,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         // start any log topic handler
         setupLogHandler();
 
-        return new JavaInstance(contextImpl, object, instanceConfig);
+        javaInstance = new JavaInstance(contextImpl, object, instanceConfig);
     }
 
     ContextImpl setupContext() {
@@ -234,16 +243,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     @Override
     public void run() {
         try {
-            this.instanceCache = InstanceCache.getInstanceCache();
-
-            if (this.collectorRegistry == null) {
-                this.collectorRegistry = new CollectorRegistry();
-            }
-            this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
-                    this.instanceCache.getScheduledExecutorService(),
-                    this.componentType);
-
-            javaInstance = setupJavaInstance();
+            setup();
+            
             while (true) {
                 currentRecord = readInput();
 
@@ -546,13 +547,32 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
     }
 
-    public InstanceCommunication.MetricsData getAndResetMetrics() {
-        InstanceCommunication.MetricsData metricsData = getMetrics();
-        stats.reset();
+    synchronized public String getStatsAsString() throws IOException {
+        if (stats != null) {
+            return stats.getStatsAsString();
+        } else {
+            return "";
+        }
+    }
+
+    // This method is synchronized because it is using the stats variable
+    synchronized public InstanceCommunication.MetricsData getAndResetMetrics() {
+        InstanceCommunication.MetricsData metricsData = internalGetMetrics();
+        internalResetMetrics();
         return metricsData;
     }
 
-    public InstanceCommunication.MetricsData getMetrics() {
+    // This method is synchronized because it is using the stats and javaInstance variables
+    synchronized public InstanceCommunication.MetricsData getMetrics() {
+        return internalGetMetrics();
+    }
+
+    // This method is synchronized because it is using the stats and javaInstance variables
+    synchronized public void resetMetrics() {
+        internalResetMetrics();
+    }
+
+    private InstanceCommunication.MetricsData internalGetMetrics() {
         InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
         if (javaInstance != null) {
             Map<String, Double> userMetrics =  javaInstance.getMetrics();
@@ -563,9 +583,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         return bldr.build();
     }
 
-    public void resetMetrics() {
-        stats.reset();
-        javaInstance.resetMetrics();
+    private void internalResetMetrics() {
+        if (stats != null) {
+            stats.reset();
+        }
+        if (javaInstance != null) {
+            javaInstance.resetMetrics();
+        }
     }
 
     private Builder createMetricsDataBuilder() {
@@ -588,7 +612,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         return bldr;
     }
 
-    public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
+    // This method is synchronized because it is using the stats variable
+    synchronized public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
         if (stats != null) {
             functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
@@ -643,7 +668,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         config.getRootLogger().removeAppender(logAppender.getName());
     }
 
-    public void setupInput(ContextImpl contextImpl) throws Exception {
+    private void setupInput(ContextImpl contextImpl) throws Exception {
 
         SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource();
         Object object;
@@ -745,7 +770,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
     }
 
-    public void setupOutput(ContextImpl contextImpl) throws Exception {
+    private void setupOutput(ContextImpl contextImpl) throws Exception {
 
         SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink();
         Object object;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index ffbed6b..5cc74c5 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -175,7 +175,7 @@ public class ThreadRuntime implements Runtime {
 
     @Override
     public String getPrometheusMetrics() throws IOException {
-        return javaInstanceRunnable.getStats().getStatsAsString();
+        return javaInstanceRunnable.getStatsAsString();
     }
 
     @Override