You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/04/19 22:34:13 UTC
[pulsar] branch master updated: Fix Pulsar Function localrun with
multiple instances and metrics server is enabled (#10208)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a89212 Fix Pulsar Function localrun with multiple instances and metrics server is enabled (#10208)
7a89212 is described below
commit 7a89212afabd044c922a039c897df966557dff56
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Apr 19 15:33:08 2021 -0700
Fix Pulsar Function localrun with multiple instances and metrics server is enabled (#10208)
---
.../worker/PulsarFunctionLocalRunTest.java | 255 ++++++++++++++-------
.../functions/worker/PulsarFunctionTestUtils.java | 1 -
.../pulsar/functions/instance/ContextImpl.java | 23 +-
.../functions/instance/JavaInstanceRunnable.java | 8 +-
.../instance/stats/ComponentStatsManager.java | 6 +-
.../instance/stats/FunctionCollectorRegistry.java | 43 ++++
.../stats/FunctionCollectorRegistryImpl.java | 42 ++++
.../instance/stats/FunctionStatsManager.java | 92 +++++---
.../functions/instance/stats/SinkStatsManager.java | 68 ++++--
.../instance/stats/SourceStatsManager.java | 68 ++++--
.../pulsar/functions/instance/ContextImplTest.java | 5 +-
.../org/apache/pulsar/functions/LocalRunner.java | 39 ++--
.../functions/runtime/JavaInstanceStarter.java | 3 +-
.../pulsar/functions/runtime/RuntimeUtils.java | 3 +-
.../functions/runtime/thread/ThreadRuntime.java | 5 +-
.../runtime/thread/ThreadRuntimeFactory.java | 7 +-
16 files changed, 473 insertions(+), 195 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 5dc5197..1a00043 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -74,6 +74,8 @@ import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -394,7 +396,7 @@ public class PulsarFunctionLocalRunTest {
*
* @throws Exception
*/
- private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception {
+ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
@@ -417,6 +419,7 @@ public class PulsarFunctionLocalRunTest {
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setJar(jarFilePathUrl);
+ functionConfig.setParallelism(parallelism);
int metricsPort = FunctionCommon.findAvailablePort();
@Cleanup
LocalRunner localRunner = LocalRunner.builder()
@@ -431,15 +434,25 @@ public class PulsarFunctionLocalRunTest {
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
localRunner.start(false);
- retryStrategically((test) -> {
+ Assert.assertTrue(retryStrategically((test) -> {
try {
- TopicStats stats = admin.topics().getStats(sourceTopic);
- return stats.subscriptions.get(subscriptionName) != null
- && !stats.subscriptions.get(subscriptionName).consumers.isEmpty();
+
+ boolean result = false;
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
+ if (topicStats.subscriptions.containsKey(subscriptionName)
+ && topicStats.subscriptions.get(subscriptionName).consumers.size() == parallelism) {
+ for (ConsumerStats consumerStats : topicStats.subscriptions.get(subscriptionName).consumers) {
+ result = consumerStats.availablePermits == 1000
+ && consumerStats.metadata != null
+ && consumerStats.metadata.containsKey("id")
+ && consumerStats.metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, functionName));
+ }
+ }
+ return result;
} catch (PulsarAdminException e) {
return false;
}
- }, 50, 150);
+ }, 50, 150));
// validate pulsar sink consumer has started on the topic
TopicStats stats = admin.topics().getStats(sourceTopic);
assertTrue(stats.subscriptions.get(subscriptionName) != null
@@ -475,16 +488,31 @@ public class PulsarFunctionLocalRunTest {
String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
log.info("prometheus metrics: {}", prometheusMetrics);
- Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
- assertFalse(metrics.isEmpty());
-
- PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total");
- assertEquals(m.tags.get("cluster"), config.getClusterName());
- assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
- assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
- assertEquals(m.value, 5.0);
+ Map<String, PulsarFunctionTestUtils.Metric> metricsMap = new HashMap<>();
+ Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> {
+ if (line.startsWith("pulsar_function_processed_successfully_total")) {
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
+ assertFalse(metrics.isEmpty());
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total");
+ if (m != null) {
+ metricsMap.put(m.tags.get("instance_id"), m);
+ }
+ }
+ });
+ Assert.assertEquals(metricsMap.size(), parallelism);
+
+ double totalMsgRecv = 0.0;
+ for (int i = 0; i < parallelism; i++) {
+ PulsarFunctionTestUtils.Metric m = metricsMap.get(String.valueOf(i));
+ Assert.assertNotNull(m);
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), String.valueOf(i));
+ assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ totalMsgRecv += m.value;
+ }
+ Assert.assertEquals(totalMsgRecv, totalMsgs);
// stop functions
localRunner.stop();
@@ -523,6 +551,10 @@ public class PulsarFunctionLocalRunTest {
}
}
+ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception {
+ testE2EPulsarFunctionLocalRun(jarFilePathUrl, 1);
+ }
+
private void testAvroFunctionLocalRun(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
@@ -665,7 +697,12 @@ public class PulsarFunctionLocalRunTest {
testE2EPulsarFunctionLocalRun(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
}
- private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
+ @Test(timeOut = 40000)
+ public void testE2EPulsarFunctionLocalRunMultipleInstances() throws Throwable {
+ runWithPulsarFunctionsClassLoader(() -> testE2EPulsarFunctionLocalRun(null, 2));
+ }
+
+ private void testPulsarSourceLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sinkTopic = "persistent://" + replNamespace + "/output";
@@ -680,6 +717,8 @@ public class PulsarFunctionLocalRunTest {
}
sourceConfig.setArchive(jarFilePathUrl);
+ sourceConfig.setParallelism(parallelism);
+ int metricsPort = FunctionCommon.findAvailablePort();
@Cleanup
LocalRunner localRunner = LocalRunner.builder()
.sourceConfig(sourceConfig)
@@ -691,55 +730,84 @@ public class PulsarFunctionLocalRunTest {
.tlsHostNameVerificationEnabled(false)
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
.connectorsDirectory(workerConfig.getConnectorsDirectory())
+ .metricsPortStart(metricsPort)
.build();
localRunner.start(false);
- retryStrategically((test) -> {
+ Assert.assertTrue(retryStrategically((test) -> {
try {
- return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
+ return admin.topics().getStats(sinkTopic).publishers.size() == parallelism;
} catch (PulsarAdminException e) {
return false;
}
- }, 10, 150);
+ }, 10, 150));
- retryStrategically((test) -> {
+ Assert.assertTrue(retryStrategically((test) -> {
try {
+ boolean result = false;
TopicStats sourceStats = admin.topics().getStats(sinkTopic);
- return sourceStats.publishers.size() == 1
- && sourceStats.publishers.get(0).metadata != null
- && sourceStats.publishers.get(0).metadata.containsKey("id")
- && sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+ if (sourceStats.publishers.size() == parallelism) {
+ for (PublisherStats publisher : sourceStats.publishers) {
+ result = publisher.metadata != null
+ && publisher.metadata.containsKey("id")
+ && publisher.metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+ }
+ }
+
+ return result;
} catch (PulsarAdminException e) {
return false;
}
- }, 50, 150);
-
- TopicStats sourceStats = admin.topics().getStats(sinkTopic);
- assertEquals(sourceStats.publishers.size(), 1);
- assertNotNull(sourceStats.publishers.get(0).metadata);
- assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
- assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+ }, 50, 150));
- retryStrategically((test) -> {
+ Assert.assertTrue(retryStrategically((test) -> {
try {
- return (admin.topics().getStats(sinkTopic).publishers.size() == 1)
+ return (admin.topics().getStats(sinkTopic).publishers.size() == parallelism)
&& (admin.topics().getInternalStats(sinkTopic, false).numberOfEntries > 4);
} catch (PulsarAdminException e) {
return false;
}
- }, 50, 150);
- assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
+ }, 50, 150));
+ assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), parallelism);
+
+ // validate prometheus metrics
+ String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
+ log.info("prometheus metrics: {}", prometheusMetrics);
+
+ Map<String, PulsarFunctionTestUtils.Metric> metricsMap = new HashMap<>();
+ Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> {
+ if (line.startsWith("pulsar_source_written_total")) {
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
+ assertFalse(metrics.isEmpty());
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_written_total");
+ if (m != null) {
+ metricsMap.put(m.tags.get("instance_id"), m);
+ }
+ }
+ });
+ Assert.assertEquals(metricsMap.size(), parallelism);
+
+ for (int i = 0; i < parallelism; i++) {
+ PulsarFunctionTestUtils.Metric m = metricsMap.get(String.valueOf(i));
+ Assert.assertNotNull(m);
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), String.valueOf(i));
+ assertEquals(m.tags.get("name"), sourceName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+ assertTrue(m.value > 0.0);
+ }
localRunner.stop();
- retryStrategically((test) -> {
+ Assert.assertTrue(retryStrategically((test) -> {
try {
return (admin.topics().getStats(sinkTopic).publishers.size() == 0);
} catch (PulsarAdminException e) {
return e.getStatusCode() == 404;
}
- }, 10, 150);
+ }, 10, 150));
try {
assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 0);
@@ -750,6 +818,10 @@ public class PulsarFunctionLocalRunTest {
}
}
+ private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
+ testPulsarSourceLocalRun(jarFilePathUrl, 1);
+ }
+
@Test(timeOut = 20000, groups = "builtin")
public void testPulsarSourceStatsBuiltin() throws Exception {
testPulsarSourceLocalRun(String.format("%s://data-generator", Utils.BUILTIN));
@@ -771,7 +843,12 @@ public class PulsarFunctionLocalRunTest {
testPulsarSourceLocalRun(fileServer.getUrl("/pulsar-io-data-generator.nar"));
}
- private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
+ @Test(timeOut = 40000)
+ public void testPulsarSourceLocalRunMultipleInstances() throws Throwable {
+ runWithNarClassLoader(() -> testPulsarSourceLocalRun(null, 2));
+ }
+
+ private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -794,6 +871,7 @@ public class PulsarFunctionLocalRunTest {
}
sinkConfig.setArchive(jarFilePathUrl);
+ sinkConfig.setParallelism(parallelism);
int metricsPort = FunctionCommon.findAvailablePort();
@Cleanup
LocalRunner localRunner = LocalRunner.builder()
@@ -811,58 +889,70 @@ public class PulsarFunctionLocalRunTest {
localRunner.start(false);
- retryStrategically((test) -> {
+ Assert.assertTrue(retryStrategically((test) -> {
try {
+ boolean result = false;
TopicStats topicStats = admin.topics().getStats(sourceTopic);
-
- return topicStats.subscriptions.containsKey(subscriptionName)
- && topicStats.subscriptions.get(subscriptionName).consumers.size() == 1
- && topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 1000;
-
+ if (topicStats.subscriptions.containsKey(subscriptionName)
+ && topicStats.subscriptions.get(subscriptionName).consumers.size() == parallelism) {
+ for (ConsumerStats consumerStats : topicStats.subscriptions.get(subscriptionName).consumers) {
+ result = consumerStats.availablePermits == 1000;
+ }
+ }
+ return result;
} catch (PulsarAdminException e) {
return false;
}
- }, 20, 150);
-
- TopicStats topicStats = admin.topics().getStats(sourceTopic);
- assertEquals(topicStats.subscriptions.size(), 1);
- assertTrue(topicStats.subscriptions.containsKey(subscriptionName));
- assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
- assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 1000);
+ }, 20, 150));
int totalMsgs = 10;
for (int i = 0; i < totalMsgs; i++) {
String data = "my-message-" + i;
producer.newMessage().property(propertyKey, propertyValue).value(data).send();
}
- retryStrategically((test) -> {
+ Assert.assertTrue(retryStrategically((test) -> {
try {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs;
} catch (PulsarAdminException e) {
return false;
}
- }, 5, 200);
+ }, 5, 200));
// validate prometheus metrics
String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
log.info("prometheus metrics: {}", prometheusMetrics);
- Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
- assertFalse(metrics.isEmpty());
-
- PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total");
- assertEquals(m.tags.get("cluster"), config.getClusterName());
- assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), sinkName);
- assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
- assertEquals(m.value, 10.0);
+ Map<String, PulsarFunctionTestUtils.Metric> metricsMap = new HashMap<>();
+ Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> {
+ if (line.startsWith("pulsar_sink_written_total")) {
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
+ assertFalse(metrics.isEmpty());
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total");
+ if (m != null) {
+ metricsMap.put(m.tags.get("instance_id"), m);
+ }
+ }
+ });
+ Assert.assertEquals(metricsMap.size(), parallelism);
+
+ double totalNumRecvMsg = 0;
+ for (int i = 0; i < parallelism; i++) {
+ PulsarFunctionTestUtils.Metric m = metricsMap.get(String.valueOf(i));
+ Assert.assertNotNull(m);
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), String.valueOf(i));
+ assertEquals(m.tags.get("name"), sinkName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
+ totalNumRecvMsg += m.value;
+ }
+ assertEquals(totalNumRecvMsg, totalMsgs);
// stop sink
localRunner.stop();
- retryStrategically((test) -> {
+ Assert.assertTrue(retryStrategically((test) -> {
try {
TopicStats stats = admin.topics().getStats(sourceTopic);
return stats.subscriptions.get(subscriptionName) != null
@@ -870,14 +960,18 @@ public class PulsarFunctionLocalRunTest {
} catch (PulsarAdminException e) {
return false;
}
- }, 20, 150);
+ }, 20, 150));
- topicStats = admin.topics().getStats(sourceTopic);
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
assertTrue(topicStats.subscriptions.get(subscriptionName) != null
&& topicStats.subscriptions.get(subscriptionName).consumers.isEmpty());
}
+ private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
+ testPulsarSourceLocalRun(jarFilePathUrl, 1);
+ }
+
@Test(timeOut = 20000, groups = "builtin")
public void testPulsarSinkStatsBuiltin() throws Exception {
testPulsarSinkLocalRun(String.format("%s://data-generator", Utils.BUILTIN));
@@ -888,6 +982,22 @@ public class PulsarFunctionLocalRunTest {
runWithNarClassLoader(() -> testPulsarSinkLocalRun(null));
}
+ @Test(timeOut = 20000)
+ public void testPulsarSinkStatsWithFile() throws Exception {
+ String jarFilePathUrl = getPulsarIODataGeneratorNar().toURI().toString();
+ testPulsarSinkLocalRun(jarFilePathUrl);
+ }
+
+ @Test(timeOut = 40000)
+ public void testPulsarSinkStatsWithUrl() throws Exception {
+ testPulsarSinkLocalRun(fileServer.getUrl("/pulsar-io-data-generator.nar"));
+ }
+
+ @Test(timeOut = 40000)
+ public void testPulsarSinkStatsMultipleInstances() throws Throwable {
+ runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 2));
+ }
+
private void runWithNarClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try (NarClassLoader classLoader = NarClassLoader.getFromArchive(getPulsarIODataGeneratorNar(), Collections.emptySet(), originalClassLoader, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR)) {
@@ -909,15 +1019,4 @@ public class PulsarFunctionLocalRunTest {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
-
- @Test(timeOut = 20000)
- public void testPulsarSinkStatsWithFile() throws Exception {
- String jarFilePathUrl = getPulsarIODataGeneratorNar().toURI().toString();
- testPulsarSinkLocalRun(jarFilePathUrl);
- }
-
- @Test(timeOut = 40000)
- public void testPulsarSinkStatsWithUrl() throws Exception {
- testPulsarSinkLocalRun(fileServer.getUrl("/pulsar-io-data-generator.nar"));
- }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
index 8f5316c..46aa37d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
@@ -106,7 +106,6 @@ public class PulsarFunctionTestUtils {
parsed.put(name, m);
});
- log.info("parsed metrics: {}", parsed);
return parsed;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index e927332..357bbcf 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
@@ -115,7 +116,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
private final Function.FunctionDetails.ComponentType componentType;
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
- SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
+ SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin) {
this.config = config;
@@ -172,15 +173,17 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
default:
throw new RuntimeException("Unknown component type: " + componentType);
}
- this.userMetricsSummary = Summary.build()
- .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
- .help("User defined metric.")
- .labelNames(userMetricsLabelNames)
- .quantile(0.5, 0.01)
- .quantile(0.9, 0.01)
- .quantile(0.99, 0.01)
- .quantile(0.999, 0.01)
- .register(collectorRegistry);
+ this.userMetricsSummary = collectorRegistry.registerIfNotExist(
+ prefix + ComponentStatsManager.USER_METRIC_PREFIX,
+ Summary.build()
+ .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
+ .help("User defined metric.")
+ .labelNames(userMetricsLabelNames)
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .create());
this.componentType = componentType;
this.stateManager = stateManager;
this.defaultStateStore = (DefaultStateStore) stateManager.getStore(
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d761ccb..36f8608 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.state.StateStoreContextImpl;
import org.apache.pulsar.functions.instance.state.StateStoreProvider;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -106,7 +107,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final SecretsProvider secretsProvider;
- private CollectorRegistry collectorRegistry;
+ private FunctionCollectorRegistry collectorRegistry;
private final String[] metricsLabels;
private InstanceCache instanceCache;
@@ -130,14 +131,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
- CollectorRegistry collectorRegistry,
+ FunctionCollectorRegistry collectorRegistry,
ClassLoader functionClassLoader) {
this.instanceConfig = instanceConfig;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
- this.collectorRegistry = collectorRegistry;
this.functionClassLoader = functionClassLoader;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
@@ -171,7 +171,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.instanceCache = InstanceCache.getInstanceCache();
if (this.collectorRegistry == null) {
- this.collectorRegistry = new CollectorRegistry();
+ this.collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
}
this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
this.instanceCache.getScheduledExecutorService(),
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index cbdcc0f..d822aa0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -38,7 +38,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
protected ScheduledFuture<?> scheduledFuture;
- protected final CollectorRegistry collectorRegistry;
+ protected final FunctionCollectorRegistry collectorRegistry;
protected final EvictingQueue EMPTY_QUEUE = EvictingQueue.create(0);
@@ -53,7 +53,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
}
- public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
+ public static ComponentStatsManager getStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
Function.FunctionDetails.ComponentType componentType) {
@@ -69,7 +69,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
}
}
- public ComponentStatsManager(CollectorRegistry collectorRegistry,
+ public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistry.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistry.java
new file mode 100644
index 0000000..d297639
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistry.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance.stats;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+
+/**
+ * Internal representation of Prometheus Collector Registry
+ */
+public abstract class FunctionCollectorRegistry extends CollectorRegistry {
+ public static FunctionCollectorRegistry getDefaultImplementation() {
+ return new FunctionCollectorRegistryImpl();
+ }
+
+ /**
+ * Register a metric if it does not yet exist. If it does exist, then return the existing metric.
+ * Currently, only needed by the LocalRunner when running in threaded and exposing metrics via a http server.
+ * This method helps resolve the conflict in which multiple instances within the LocalRunner process try to register the same metric.
+ * @param metricName the name of the metric
+ * @param collector the metric object e.g. Count, Gauge, etc.
+ * @param <T>
+ * @return If the metric with the name `metricName` already exists, return the existing metric object. If not, return null
+ */
+ public abstract <T extends Collector> T registerIfNotExist(String metricName, T collector);
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistryImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistryImpl.java
new file mode 100644
index 0000000..9f003a4
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistryImpl.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance.stats;
+
+import io.prometheus.client.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FunctionCollectorRegistryImpl extends FunctionCollectorRegistry {
+
+ private final Map<String, Collector> namesToCollectors = new HashMap<String, Collector>();
+
+ public Collector registerIfNotExist(String metricName, Collector collector) {
+ synchronized (this) {
+ Collector existingCollector = namesToCollectors.get(metricName);
+ if (existingCollector == null) {
+ namesToCollectors.put(metricName, collector);
+ super.register(collector);
+ return collector;
+ }
+ return existingCollector;
+ }
+ }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index f02b850..08ea9ea 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -119,33 +119,41 @@ public class FunctionStatsManager extends ComponentStatsManager{
private final RateLimiter sysExceptionRateLimiter;
- public FunctionStatsManager(CollectorRegistry collectorRegistry,
+ public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService) {
super(collectorRegistry, metricsLabels, scheduledExecutorService);
- statTotalProcessedSuccessfully = Counter.build()
+ statTotalProcessedSuccessfully = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL,
+ Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.labels(metricsLabels);
- statTotalSysExceptions = Counter.build()
+ statTotalSysExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL ,
+ Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
- statTotalUserExceptions = Counter.build()
+ statTotalUserExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL,
+ Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalUserExceptions = statTotalUserExceptions.labels(metricsLabels);
- statProcessLatency = Summary.build()
+ statProcessLatency = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS,
+ Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
.quantile(0.5, 0.01)
@@ -153,45 +161,57 @@ public class FunctionStatsManager extends ComponentStatsManager{
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statProcessLatency = statProcessLatency.labels(metricsLabels);
- statlastInvocation = Gauge.build()
+ statlastInvocation = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION,
+ Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
.help("The timestamp of the last invocation of the function.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statlastInvocation = statlastInvocation.labels(metricsLabels);
- statTotalRecordsReceived = Counter.build()
+ statTotalRecordsReceived = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL,
+ Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels);
- statTotalProcessedSuccessfully1min = Counter.build()
+ statTotalProcessedSuccessfully1min = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min,
+ Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
.help("Total number of messages processed successfully in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels);
- statTotalSysExceptions1min = Counter.build()
+ statTotalSysExceptions1min = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min,
+ Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
.help("Total number of system exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
- statTotalUserExceptions1min = Counter.build()
+ statTotalUserExceptions1min = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min,
+ Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min)
.help("Total number of user exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels);
- statProcessLatency1min = Summary.build()
+ statProcessLatency1min = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min,
+ Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
.help("Process latency in milliseconds in the last 1 minute.")
.quantile(0.5, 0.01)
@@ -199,38 +219,48 @@ public class FunctionStatsManager extends ComponentStatsManager{
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
- statTotalRecordsReceived1min = Counter.build()
+ statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min,
+ Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of messages received from source in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels);
- userExceptions = Gauge.build()
+ userExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + "user_exception",
+ Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from user code.")
- .register(collectorRegistry);
- sysExceptions = Gauge.build()
+ .create());
+ sysExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + "system_exception",
+ Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from system code.")
- .register(collectorRegistry);
+ .create());
- sourceExceptions = Gauge.build()
+ sourceExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + "source_exception",
+ Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "source_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from source.")
- .register(collectorRegistry);
+ .create());
- sinkExceptions = Gauge.build()
+ sinkExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception",
+ Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from sink.")
- .register(collectorRegistry);
+ .create());
userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index 401aa34..46999cd 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -95,84 +95,106 @@ public class SinkStatsManager extends ComponentStatsManager {
private final RateLimiter sinkExceptionRateLimiter;
- public SinkStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService
+ public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService
scheduledExecutorService) {
super(collectorRegistry, metricsLabels, scheduledExecutorService);
- statTotalRecordsReceived = Counter.build()
+ statTotalRecordsReceived = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL,
+ Counter.build()
.name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of records sink has received from Pulsar topic(s).")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels);
- statTotalSysExceptions = Counter.build()
+ statTotalSysExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL,
+ Counter.build()
.name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
- statTotalSinkExceptions = Counter.build()
+ statTotalSinkExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL,
+ Counter.build()
.name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
.help("Total number of sink exceptions.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSinkExceptions = statTotalSinkExceptions.labels(metricsLabels);
- statTotalWritten = Counter.build()
+ statTotalWritten = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL,
+ Counter.build()
.name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL)
.help("Total number of records processed by sink.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalWritten = statTotalWritten.labels(metricsLabels);
- statlastInvocation = Gauge.build()
+ statlastInvocation = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION,
+ Gauge.build()
.name(PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION)
.help("The timestamp of the last invocation of the sink.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statlastInvocation = statlastInvocation.labels(metricsLabels);
- statTotalRecordsReceived1min = Counter.build()
+ statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min,
+ Counter.build()
.name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of messages sink has received from Pulsar topic(s) in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels);
- statTotalSysExceptions1min = Counter.build()
+ statTotalSysExceptions1min = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min,
+ Counter.build()
.name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
.help("Total number of system exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
- statTotalSinkExceptions1min = Counter.build()
+ statTotalSinkExceptions1min = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min,
+ Counter.build()
.name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min)
.help("Total number of sink exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels);
- statTotalWritten1min = Counter.build()
+ statTotalWritten1min = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min,
+ Counter.build()
.name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min)
.help("Total number of records processed by sink the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
- sysExceptions = Gauge.build()
+ sysExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + "system_exception",
+ Gauge.build()
.name(PULSAR_SINK_METRICS_PREFIX + "system_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from system code.")
- .register(collectorRegistry);
+ .create());
- sinkExceptions = Gauge.build()
+ sinkExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_SINK_METRICS_PREFIX + "sink_exception",
+ Gauge.build()
.name(PULSAR_SINK_METRICS_PREFIX + "sink_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from sink.")
- .register(collectorRegistry);
+ .create());
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 287240c..e79e0b5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -94,84 +94,106 @@ public class SourceStatsManager extends ComponentStatsManager {
protected final RateLimiter sourceExceptionRateLimiter;
- public SourceStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService
+ public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService
scheduledExecutorService) {
super(collectorRegistry, metricsLabels, scheduledExecutorService);
- statTotalRecordsReceived = Counter.build()
+ statTotalRecordsReceived = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL,
+ Counter.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of records received from source.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels);
- statTotalSysExceptions = Counter.build()
+ statTotalSysExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL,
+ Counter.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
- statTotalSourceExceptions = Counter.build()
+ statTotalSourceExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL,
+ Counter.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
.help("Total number of source exceptions.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSourceExceptions = statTotalSourceExceptions.labels(metricsLabels);
- statTotalWritten = Counter.build()
+ statTotalWritten = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL,
+ Counter.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL)
.help("Total number of records written to a Pulsar topic.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalWritten = statTotalWritten.labels(metricsLabels);
- statlastInvocation = Gauge.build()
+ statlastInvocation = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION,
+ Gauge.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION)
.help("The timestamp of the last invocation of the source.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statlastInvocation = statlastInvocation.labels(metricsLabels);
- statTotalRecordsReceived1min = Counter.build()
+ statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min,
+ Counter.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of records received from source in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels);
- statTotalSysExceptions1min = Counter.build()
+ statTotalSysExceptions1min = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min,
+ Counter.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
.help("Total number of system exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
- statTotalSourceExceptions1min = Counter.build()
+ statTotalSourceExceptions1min = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min,
+ Counter.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min)
.help("Total number of source exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels);
- statTotalWritten1min = Counter.build()
+ statTotalWritten1min = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min,
+ Counter.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min)
.help("Total number of records written to a Pulsar topic in the last 1 minute.")
.labelNames(metricsLabelNames)
- .register(collectorRegistry);
+ .create());
_statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
- sysExceptions = Gauge.build()
+ sysExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + "system_exception",
+ Gauge.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + "system_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from system code.")
- .register(collectorRegistry);
+ .create());
- sourceExceptions = Gauge.build()
+ sourceExceptions = collectorRegistry.registerIfNotExist(
+ PULSAR_SOURCE_METRICS_PREFIX + "source_exception",
+ Gauge.build()
.name(PULSAR_SOURCE_METRICS_PREFIX + "source_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from source.")
- .register(collectorRegistry);
+ .create());
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index ceb87c3..8bc8a54 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.BKStateStoreImpl;
import org.apache.pulsar.functions.instance.state.InstanceStateManager;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
import org.slf4j.Logger;
@@ -92,7 +93,7 @@ public class ContextImplTest {
config,
logger,
client,
- new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
+ new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
context.setCurrentMessageContext((Record<String>) () -> null);
@@ -182,7 +183,7 @@ public class ContextImplTest {
config,
logger,
client,
- new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
+ new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
context.getPulsarAdmin();
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index f599139..c7e0c28 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.nar.FileUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
@@ -171,7 +172,7 @@ public class LocalRunner implements AutoCloseable {
protected String secretsProviderClassName;
@Parameter(names = "--secretsProviderConfig", description = "Whats the config for the secrets provider", hidden = true)
protected String secretsProviderConfig;
- @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server", hidden = true)
+ @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server. When running instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
protected Integer metricsPortStart;
private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
@@ -485,7 +486,16 @@ public class LocalRunner implements AutoCloseable {
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());
- instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+
+ if (metricsPortStart != null) {
+ int metricsPort = metricsPortStart + i;
+ if (metricsPortStart < 0 || metricsPortStart > 65535) {
+ throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
+ }
+ instanceConfig.setMetricsPort(metricsPort);
+ } else {
+ instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+ }
instanceConfig.setClusterName("local");
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
@@ -537,6 +547,13 @@ public class LocalRunner implements AutoCloseable {
int parallelism, int instanceIdOffset, String serviceUrl,
String stateStorageServiceUrl, AuthenticationConfig authConfig,
String userCodeFile) throws Exception {
+
+ if (metricsPortStart != null) {
+ if (metricsPortStart < 0 || metricsPortStart > 65535) {
+ throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
+ }
+ }
+
SecretsProvider secretsProvider;
if (secretsProviderClassName != null) {
secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
@@ -554,7 +571,7 @@ public class LocalRunner implements AutoCloseable {
}
// Collector Registry for prometheus metrics
- CollectorRegistry collectorRegistry = new CollectorRegistry();
+ FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
RuntimeUtils.registerDefaultCollectors(collectorRegistry);
ThreadRuntimeFactory threadRuntimeFactory;
@@ -583,10 +600,7 @@ public class LocalRunner implements AutoCloseable {
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
if (metricsPortStart != null) {
- if (metricsPortStart < 0 || metricsPortStart > 65535) {
- throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
- }
- instanceConfig.setMetricsPort(metricsPortStart + i);
+ instanceConfig.setMetricsPort(metricsPortStart);
}
instanceConfig.setClusterName("local");
if (functionConfig != null) {
@@ -604,13 +618,12 @@ public class LocalRunner implements AutoCloseable {
30000);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
+ }
- if (metricsPortStart != null) {
- // starting metrics server
- log.info("Starting metrics server on port {}", instanceConfig.getMetricsPort());
- new HTTPServer(new InetSocketAddress(instanceConfig.getMetricsPort()), collectorRegistry, true);
- }
-
+ if (metricsPortStart != null) {
+ // starting metrics server
+ log.info("Starting metrics server on port {}", metricsPortStart);
+ new HTTPServer(new InetSocketAddress(metricsPortStart), collectorRegistry, true);
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 0de9782..170e606 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
@@ -191,7 +192,7 @@ public class JavaInstanceStarter implements AutoCloseable {
secretsProvider.init(secretsProviderConfigMap);
// Collector Registry for prometheus metrics
- CollectorRegistry collectorRegistry = new CollectorRegistry();
+ FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
RuntimeUtils.registerDefaultCollectors(collectorRegistry);
containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index c62a6bf..8c359a9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -475,7 +476,7 @@ public class RuntimeUtils {
return ObjectMapperFactory.getThreadLocal().convertValue(configMap, functionRuntimeConfigClass);
}
- public static void registerDefaultCollectors(CollectorRegistry registry) {
+ public static void registerDefaultCollectors(FunctionCollectorRegistry registry) {
// Add the JMX exporter for functionality similar to the kafka connect JMX metrics
try {
new JmxCollector("{}").register(registry);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index ec9343e..ad4d400 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -65,7 +66,7 @@ public class ThreadRuntime implements Runtime {
private PulsarAdmin pulsarAdmin;
private String stateStorageServiceUrl;
private SecretsProvider secretsProvider;
- private CollectorRegistry collectorRegistry;
+ private FunctionCollectorRegistry collectorRegistry;
private String narExtractionDirectory;
private final Optional<ConnectorsManager> connectorsManager;
@@ -77,7 +78,7 @@ public class ThreadRuntime implements Runtime {
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
- CollectorRegistry collectorRegistry,
+ FunctionCollectorRegistry collectorRegistry,
String narExtractionDirectory,
Optional<ConnectorsManager> connectorsManager) {
this.instanceConfig = instanceConfig;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 1be328b..d1f450c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
@@ -60,7 +61,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private PulsarAdmin pulsarAdmin;
private String storageServiceUrl;
private SecretsProvider defaultSecretsProvider;
- private CollectorRegistry collectorRegistry;
+ private FunctionCollectorRegistry collectorRegistry;
private String narExtractionDirectory;
private volatile boolean closed;
private SecretsProviderConfigurator secretsProviderConfigurator;
@@ -73,7 +74,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
*/
public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
AuthenticationConfig authConfig, SecretsProvider secretsProvider,
- CollectorRegistry collectorRegistry, String narExtractionDirectory,
+ FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
String pulsarWebServiceUrl) throws Exception {
initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig,
@@ -83,7 +84,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, String pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl,
SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider,
- CollectorRegistry collectorRegistry, String narExtractionDirectory,
+ FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
String pulsarWebServiceUrl, Optional<ConnectorsManager> connectorsManager) throws PulsarClientException {