You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/22 18:32:29 UTC
[pulsar] branch master updated: Fix null pointer when getting
function instance metrics. (#7010)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 20320c7 Fix null pointer when getting function instance metrics. (#7010)
20320c7 is described below
commit 20320c7ca15498f5655668e99cce286be381c76d
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 22 11:32:17 2020 -0700
Fix null pointer when getting function instance metrics. (#7010)
* Fix null pointer when getting function instance metrics.
* Made more functions sync
* Made the remaining public interface synchronized
* Made stats class public method synchronized so they are thread safe.
* Made setup synchronized so that it and close won't run together
* Undo making stats sync until we resolve differences
* Incorporated feedback
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../functions/instance/JavaInstanceRunnable.java | 75 ++++++++++++++--------
.../functions/runtime/thread/ThreadRuntime.java | 2 +-
2 files changed, 51 insertions(+), 26 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 0fa4fc6..892c7e5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,6 +24,8 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
+
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.Getter;
@@ -102,9 +104,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// provide tables for storing states
private final String stateStorageServiceUrl;
- @Getter(AccessLevel.PACKAGE)
private StorageClient storageClient;
- @Getter(AccessLevel.PACKAGE)
private Table<ByteBuf, ByteBuf> stateTable;
private JavaInstance javaInstance;
@@ -112,7 +112,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private Throwable deathException;
// function stats
- @Getter
private ComponentStatsManager stats;
private Record<?> currentRecord;
@@ -178,7 +177,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
/**
* NOTE: this method should be called in the instance thread, in order to make class loading work.
*/
- JavaInstance setupJavaInstance() throws Exception {
+ synchronized private void setup() throws Exception {
+
+ this.instanceCache = InstanceCache.getInstanceCache();
+
+ if (this.collectorRegistry == null) {
+ this.collectorRegistry = new CollectorRegistry();
+ }
+ this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
+ this.instanceCache.getScheduledExecutorService(),
+ this.componentType);
+
// initialize the thread context
ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
@@ -218,7 +227,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// start any log topic handler
setupLogHandler();
- return new JavaInstance(contextImpl, object, instanceConfig);
+ javaInstance = new JavaInstance(contextImpl, object, instanceConfig);
}
ContextImpl setupContext() {
@@ -234,16 +243,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
@Override
public void run() {
try {
- this.instanceCache = InstanceCache.getInstanceCache();
-
- if (this.collectorRegistry == null) {
- this.collectorRegistry = new CollectorRegistry();
- }
- this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
- this.instanceCache.getScheduledExecutorService(),
- this.componentType);
-
- javaInstance = setupJavaInstance();
+ setup();
+
while (true) {
currentRecord = readInput();
@@ -546,13 +547,32 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
}
- public InstanceCommunication.MetricsData getAndResetMetrics() {
- InstanceCommunication.MetricsData metricsData = getMetrics();
- stats.reset();
+ synchronized public String getStatsAsString() throws IOException {
+ if (stats != null) {
+ return stats.getStatsAsString();
+ } else {
+ return "";
+ }
+ }
+
+ // This method is synchronized because it is using the stats variable
+ synchronized public InstanceCommunication.MetricsData getAndResetMetrics() {
+ InstanceCommunication.MetricsData metricsData = internalGetMetrics();
+ internalResetMetrics();
return metricsData;
}
- public InstanceCommunication.MetricsData getMetrics() {
+ // This method is synchronized because it is using the stats and javaInstance variables
+ synchronized public InstanceCommunication.MetricsData getMetrics() {
+ return internalGetMetrics();
+ }
+
+ // This method is synchronized because it is using the stats and javaInstance variables
+ synchronized public void resetMetrics() {
+ internalResetMetrics();
+ }
+
+ private InstanceCommunication.MetricsData internalGetMetrics() {
InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
if (javaInstance != null) {
Map<String, Double> userMetrics = javaInstance.getMetrics();
@@ -563,9 +583,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
return bldr.build();
}
- public void resetMetrics() {
- stats.reset();
- javaInstance.resetMetrics();
+ private void internalResetMetrics() {
+ if (stats != null) {
+ stats.reset();
+ }
+ if (javaInstance != null) {
+ javaInstance.resetMetrics();
+ }
}
private Builder createMetricsDataBuilder() {
@@ -588,7 +612,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
return bldr;
}
- public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
+ // This method is synchronized because it is using the stats variable
+ synchronized public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
if (stats != null) {
functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
@@ -643,7 +668,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
config.getRootLogger().removeAppender(logAppender.getName());
}
- public void setupInput(ContextImpl contextImpl) throws Exception {
+ private void setupInput(ContextImpl contextImpl) throws Exception {
SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource();
Object object;
@@ -745,7 +770,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
}
- public void setupOutput(ContextImpl contextImpl) throws Exception {
+ private void setupOutput(ContextImpl contextImpl) throws Exception {
SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink();
Object object;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index ffbed6b..5cc74c5 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -175,7 +175,7 @@ public class ThreadRuntime implements Runtime {
@Override
public String getPrometheusMetrics() throws IOException {
- return javaInstanceRunnable.getStats().getStatsAsString();
+ return javaInstanceRunnable.getStatsAsString();
}
@Override