You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/12/20 19:14:52 UTC
[pulsar] branch master updated: Allow stats operations not to be
blocked in functions (#9005)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 31f7d70 Allow stats operations not to be blocked in functions (#9005)
31f7d70 is described below
commit 31f7d70bd4fdde4c8a272714dece8d806e31ebae
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Dec 20 11:14:18 2020 -0800
Allow stats operations not to be blocked in functions (#9005)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../functions/instance/JavaInstanceRunnable.java | 133 +++++++++++++--------
1 file changed, 85 insertions(+), 48 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fa68656..fe476f7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -29,6 +29,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
@@ -86,7 +89,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// input topic consumer & output topic producer
private final PulsarClientImpl client;
- //private final Map<String, PulsarClient> pulsarClientMap;
private LogAppender logAppender;
@@ -122,6 +124,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private ClassLoader functionClassLoader;
private String narExtractionDirectory;
+ // a flog to determine if member variables have been initialized as part of setup().
+ // used for out of band API calls like operations involving stats
+ private transient boolean isInitialized = false;
+
+ // a read write lock for stats operations
+ private ReadWriteLock statsLock = new ReentrantReadWriteLock();
+
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
@@ -216,6 +225,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
setupLogHandler();
javaInstance = new JavaInstance(contextImpl, object, instanceConfig);
+
+ // to signal member variables are initialized
+ isInitialized = true;
}
ContextImpl setupContext() {
@@ -404,12 +416,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
/**
- * NOTE: this method is be syncrhonized because it is potentially called by two different places
+ * NOTE: this method is be synchronized because it is potentially called by two different places
* one inside the run/finally clause and one inside the ThreadRuntime::stop
*/
@Override
synchronized public void close() {
+ isInitialized = false;
+
if (stats != null) {
stats.close();
stats = null;
@@ -466,49 +480,67 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
}
- synchronized public String getStatsAsString() throws IOException {
- if (stats != null) {
- return stats.getStatsAsString();
- } else {
- return "";
+ public String getStatsAsString() throws IOException {
+ if (isInitialized) {
+ try {
+ statsLock.readLock().lock();
+ return stats.getStatsAsString();
+ } finally {
+ statsLock.readLock().unlock();
+ }
}
+ return "";
}
- // This method is synchronized because it is using the stats variable
- synchronized public InstanceCommunication.MetricsData getAndResetMetrics() {
- InstanceCommunication.MetricsData metricsData = internalGetMetrics();
- internalResetMetrics();
- return metricsData;
+ public InstanceCommunication.MetricsData getAndResetMetrics() {
+ if (isInitialized) {
+ try {
+ statsLock.writeLock().lock();
+ InstanceCommunication.MetricsData metricsData = internalGetMetrics();
+ internalResetMetrics();
+ return metricsData;
+ } finally {
+ statsLock.writeLock().unlock();
+ }
+ }
+ return InstanceCommunication.MetricsData.getDefaultInstance();
}
- // This method is synchronized because it is using the stats and javaInstance variables
- synchronized public InstanceCommunication.MetricsData getMetrics() {
- return internalGetMetrics();
+ public InstanceCommunication.MetricsData getMetrics() {
+ if (isInitialized) {
+ try {
+ statsLock.readLock().lock();
+ return internalGetMetrics();
+ } finally {
+ statsLock.readLock().unlock();
+ }
+ }
+ return InstanceCommunication.MetricsData.getDefaultInstance();
}
- // This method is synchronized because it is using the stats and javaInstance variables
- synchronized public void resetMetrics() {
- internalResetMetrics();
+ public void resetMetrics() {
+ if (isInitialized) {
+ try {
+ statsLock.writeLock().lock();
+ internalResetMetrics();
+ } finally {
+ statsLock.writeLock().unlock();
+ }
+ }
}
private InstanceCommunication.MetricsData internalGetMetrics() {
InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
- if (javaInstance != null) {
- Map<String, Double> userMetrics = javaInstance.getMetrics();
- if (userMetrics != null) {
- bldr.putAllUserMetrics(userMetrics);
- }
+ Map<String, Double> userMetrics = javaInstance.getMetrics();
+ if (userMetrics != null) {
+ bldr.putAllUserMetrics(userMetrics);
}
return bldr.build();
}
private void internalResetMetrics() {
- if (stats != null) {
stats.reset();
- }
- if (javaInstance != null) {
javaInstance.resetMetrics();
- }
}
private Builder createMetricsDataBuilder() {
@@ -531,28 +563,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
return bldr;
}
- // This method is synchronized because it is using the stats variable
- synchronized public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
+ public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
- if (stats != null) {
- functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
- functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
- functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
- stats.getLatestUserExceptions().forEach(ex -> {
- functionStatusBuilder.addLatestUserExceptions(ex);
- });
- functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions());
- stats.getLatestSystemExceptions().forEach(ex -> {
- functionStatusBuilder.addLatestSystemExceptions(ex);
- });
- stats.getLatestSourceExceptions().forEach(ex -> {
- functionStatusBuilder.addLatestSourceExceptions(ex);
- });
- stats.getLatestSinkExceptions().forEach(ex -> {
- functionStatusBuilder.addLatestSinkExceptions(ex);
- });
- functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
- functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation());
+ if (isInitialized) {
+ try {
+ statsLock.readLock().lock();
+
+ functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
+ functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
+ functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
+ stats.getLatestUserExceptions().forEach(ex -> {
+ functionStatusBuilder.addLatestUserExceptions(ex);
+ });
+ functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions());
+ stats.getLatestSystemExceptions().forEach(ex -> {
+ functionStatusBuilder.addLatestSystemExceptions(ex);
+ });
+ stats.getLatestSourceExceptions().forEach(ex -> {
+ functionStatusBuilder.addLatestSourceExceptions(ex);
+ });
+ stats.getLatestSinkExceptions().forEach(ex -> {
+ functionStatusBuilder.addLatestSinkExceptions(ex);
+ });
+ functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
+ functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation());
+ } finally {
+ statsLock.readLock().unlock();
+ }
}
return functionStatusBuilder;
}