You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/12/20 19:14:52 UTC

[pulsar] branch master updated: Allow stats operations not to be blocked in functions (#9005)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 31f7d70  Allow stats operations not to be blocked in functions (#9005)
31f7d70 is described below

commit 31f7d70bd4fdde4c8a272714dece8d806e31ebae
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Dec 20 11:14:18 2020 -0800

    Allow stats operations not to be blocked in functions (#9005)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../functions/instance/JavaInstanceRunnable.java   | 133 +++++++++++++--------
 1 file changed, 85 insertions(+), 48 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fa68656..fe476f7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -29,6 +29,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
@@ -86,7 +89,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     // input topic consumer & output topic producer
     private final PulsarClientImpl client;
-    //private final Map<String, PulsarClient> pulsarClientMap;
 
     private LogAppender logAppender;
 
@@ -122,6 +124,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     private ClassLoader functionClassLoader;
     private String narExtractionDirectory;
 
+    // a flog to determine if member variables have been initialized as part of setup().
+    // used for out of band API calls like operations involving stats
+    private transient boolean isInitialized = false;
+
+    // a read write lock for stats operations
+    private ReadWriteLock statsLock = new ReentrantReadWriteLock();
+
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
                                 String jarFile,
@@ -216,6 +225,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         setupLogHandler();
 
         javaInstance = new JavaInstance(contextImpl, object, instanceConfig);
+
+        // to signal member variables are initialized
+        isInitialized = true;
     }
 
     ContextImpl setupContext() {
@@ -404,12 +416,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     }
 
     /**
-     * NOTE: this method is be syncrhonized because it is potentially called by two different places
+     * NOTE: this method is be synchronized because it is potentially called by two different places
      *       one inside the run/finally clause and one inside the ThreadRuntime::stop
      */
     @Override
     synchronized public void close() {
 
+        isInitialized = false;
+
         if (stats != null) {
             stats.close();
             stats = null;
@@ -466,49 +480,67 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
     }
 
-    synchronized public String getStatsAsString() throws IOException {
-        if (stats != null) {
-            return stats.getStatsAsString();
-        } else {
-            return "";
+    public String getStatsAsString() throws IOException {
+        if (isInitialized) {
+            try {
+                statsLock.readLock().lock();
+                return stats.getStatsAsString();
+            } finally {
+                statsLock.readLock().unlock();
+            }
         }
+        return "";
     }
 
-    // This method is synchronized because it is using the stats variable
-    synchronized public InstanceCommunication.MetricsData getAndResetMetrics() {
-        InstanceCommunication.MetricsData metricsData = internalGetMetrics();
-        internalResetMetrics();
-        return metricsData;
+    public InstanceCommunication.MetricsData getAndResetMetrics() {
+        if (isInitialized) {
+            try {
+                statsLock.writeLock().lock();
+                InstanceCommunication.MetricsData metricsData = internalGetMetrics();
+                internalResetMetrics();
+                return metricsData;
+            } finally {
+                statsLock.writeLock().unlock();
+            }
+        }
+        return InstanceCommunication.MetricsData.getDefaultInstance();
     }
 
-    // This method is synchronized because it is using the stats and javaInstance variables
-    synchronized public InstanceCommunication.MetricsData getMetrics() {
-        return internalGetMetrics();
+    public InstanceCommunication.MetricsData getMetrics() {
+        if (isInitialized) {
+            try {
+                statsLock.readLock().lock();
+                return internalGetMetrics();
+            } finally {
+                statsLock.readLock().unlock();
+            }
+        }
+        return InstanceCommunication.MetricsData.getDefaultInstance();
     }
 
-    // This method is synchronized because it is using the stats and javaInstance variables
-    synchronized public void resetMetrics() {
-        internalResetMetrics();
+    public void resetMetrics() {
+        if (isInitialized) {
+            try {
+                statsLock.writeLock().lock();
+                internalResetMetrics();
+            } finally {
+                statsLock.writeLock().unlock();
+            }
+        }
     }
 
     private InstanceCommunication.MetricsData internalGetMetrics() {
         InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
-        if (javaInstance != null) {
-            Map<String, Double> userMetrics =  javaInstance.getMetrics();
-            if (userMetrics != null) {
-                bldr.putAllUserMetrics(userMetrics);
-            }
+        Map<String, Double> userMetrics = javaInstance.getMetrics();
+        if (userMetrics != null) {
+            bldr.putAllUserMetrics(userMetrics);
         }
         return bldr.build();
     }
 
     private void internalResetMetrics() {
-        if (stats != null) {
             stats.reset();
-        }
-        if (javaInstance != null) {
             javaInstance.resetMetrics();
-        }
     }
 
     private Builder createMetricsDataBuilder() {
@@ -531,28 +563,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         return bldr;
     }
 
-    // This method is synchronized because it is using the stats variable
-    synchronized public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
+    public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
-        if (stats != null) {
-            functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
-            functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
-            functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
-            stats.getLatestUserExceptions().forEach(ex -> {
-                functionStatusBuilder.addLatestUserExceptions(ex);
-            });
-            functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions());
-            stats.getLatestSystemExceptions().forEach(ex -> {
-                functionStatusBuilder.addLatestSystemExceptions(ex);
-            });
-            stats.getLatestSourceExceptions().forEach(ex -> {
-                functionStatusBuilder.addLatestSourceExceptions(ex);
-            });
-            stats.getLatestSinkExceptions().forEach(ex -> {
-                functionStatusBuilder.addLatestSinkExceptions(ex);
-            });
-            functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
-            functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation());
+        if (isInitialized) {
+            try {
+                statsLock.readLock().lock();
+
+                functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
+                functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
+                functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
+                stats.getLatestUserExceptions().forEach(ex -> {
+                    functionStatusBuilder.addLatestUserExceptions(ex);
+                });
+                functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions());
+                stats.getLatestSystemExceptions().forEach(ex -> {
+                    functionStatusBuilder.addLatestSystemExceptions(ex);
+                });
+                stats.getLatestSourceExceptions().forEach(ex -> {
+                    functionStatusBuilder.addLatestSourceExceptions(ex);
+                });
+                stats.getLatestSinkExceptions().forEach(ex -> {
+                    functionStatusBuilder.addLatestSinkExceptions(ex);
+                });
+                functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
+                functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation());
+            } finally {
+                statsLock.readLock().unlock();
+            }
         }
         return functionStatusBuilder;
     }