You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/04/19 22:34:13 UTC

[pulsar] branch master updated: Fix Pulsar Function localrun with multiple instances and metrics server is enabled (#10208)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a89212  Fix Pulsar Function localrun with multiple instances and metrics server is enabled (#10208)
7a89212 is described below

commit 7a89212afabd044c922a039c897df966557dff56
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Apr 19 15:33:08 2021 -0700

    Fix Pulsar Function localrun with multiple instances and metrics server is enabled (#10208)
---
 .../worker/PulsarFunctionLocalRunTest.java         | 255 ++++++++++++++-------
 .../functions/worker/PulsarFunctionTestUtils.java  |   1 -
 .../pulsar/functions/instance/ContextImpl.java     |  23 +-
 .../functions/instance/JavaInstanceRunnable.java   |   8 +-
 .../instance/stats/ComponentStatsManager.java      |   6 +-
 .../instance/stats/FunctionCollectorRegistry.java  |  43 ++++
 .../stats/FunctionCollectorRegistryImpl.java       |  42 ++++
 .../instance/stats/FunctionStatsManager.java       |  92 +++++---
 .../functions/instance/stats/SinkStatsManager.java |  68 ++++--
 .../instance/stats/SourceStatsManager.java         |  68 ++++--
 .../pulsar/functions/instance/ContextImplTest.java |   5 +-
 .../org/apache/pulsar/functions/LocalRunner.java   |  39 ++--
 .../functions/runtime/JavaInstanceStarter.java     |   3 +-
 .../pulsar/functions/runtime/RuntimeUtils.java     |   3 +-
 .../functions/runtime/thread/ThreadRuntime.java    |   5 +-
 .../runtime/thread/ThreadRuntimeFactory.java       |   7 +-
 16 files changed, 473 insertions(+), 195 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 5dc5197..1a00043 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -74,6 +74,8 @@ import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -394,7 +396,7 @@ public class PulsarFunctionLocalRunTest {
      *
      * @throws Exception
      */
-    private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception {
+    private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
 
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
@@ -417,6 +419,7 @@ public class PulsarFunctionLocalRunTest {
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
         functionConfig.setJar(jarFilePathUrl);
+        functionConfig.setParallelism(parallelism);
         int metricsPort = FunctionCommon.findAvailablePort();
         @Cleanup
         LocalRunner localRunner = LocalRunner.builder()
@@ -431,15 +434,25 @@ public class PulsarFunctionLocalRunTest {
                 .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
         localRunner.start(false);
 
-        retryStrategically((test) -> {
+        Assert.assertTrue(retryStrategically((test) -> {
             try {
-                TopicStats stats = admin.topics().getStats(sourceTopic);
-                return stats.subscriptions.get(subscriptionName) != null
-                        && !stats.subscriptions.get(subscriptionName).consumers.isEmpty();
+
+                boolean result = false;
+                TopicStats topicStats = admin.topics().getStats(sourceTopic);
+                if (topicStats.subscriptions.containsKey(subscriptionName)
+                        && topicStats.subscriptions.get(subscriptionName).consumers.size() == parallelism) {
+                    for (ConsumerStats consumerStats : topicStats.subscriptions.get(subscriptionName).consumers) {
+                        result = consumerStats.availablePermits == 1000
+                                && consumerStats.metadata != null
+                                && consumerStats.metadata.containsKey("id")
+                                && consumerStats.metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, functionName));
+                    }
+                }
+                return result;
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 50, 150);
+        }, 50, 150));
         // validate pulsar sink consumer has started on the topic
         TopicStats stats = admin.topics().getStats(sourceTopic);
         assertTrue(stats.subscriptions.get(subscriptionName) != null
@@ -475,16 +488,31 @@ public class PulsarFunctionLocalRunTest {
         String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
         log.info("prometheus metrics: {}", prometheusMetrics);
 
-        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
-        assertFalse(metrics.isEmpty());
-
-        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total");
-        assertEquals(m.tags.get("cluster"), config.getClusterName());
-        assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
-        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
-        assertEquals(m.value, 5.0);
+        Map<String, PulsarFunctionTestUtils.Metric> metricsMap = new HashMap<>();
+        Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> {
+            if (line.startsWith("pulsar_function_processed_successfully_total")) {
+                Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
+                assertFalse(metrics.isEmpty());
+                PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total");
+                if (m != null) {
+                    metricsMap.put(m.tags.get("instance_id"), m);
+                }
+            }
+        });
+        Assert.assertEquals(metricsMap.size(), parallelism);
+
+        double totalMsgRecv = 0.0;
+        for (int i = 0; i < parallelism; i++) {
+            PulsarFunctionTestUtils.Metric m = metricsMap.get(String.valueOf(i));
+            Assert.assertNotNull(m);
+            assertEquals(m.tags.get("cluster"), config.getClusterName());
+            assertEquals(m.tags.get("instance_id"), String.valueOf(i));
+            assertEquals(m.tags.get("name"), functionName);
+            assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+            assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+            totalMsgRecv += m.value;
+        }
+        Assert.assertEquals(totalMsgRecv, totalMsgs);
 
         // stop functions
         localRunner.stop();
@@ -523,6 +551,10 @@ public class PulsarFunctionLocalRunTest {
         }
     }
 
+    private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception {
+        testE2EPulsarFunctionLocalRun(jarFilePathUrl, 1);
+    }
+
     private void testAvroFunctionLocalRun(String jarFilePathUrl) throws Exception {
 
         final String namespacePortion = "io";
@@ -665,7 +697,12 @@ public class PulsarFunctionLocalRunTest {
         testE2EPulsarFunctionLocalRun(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
     }
 
-    private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
+    @Test(timeOut = 40000)
+    public void testE2EPulsarFunctionLocalRunMultipleInstances() throws Throwable {
+        runWithPulsarFunctionsClassLoader(() -> testE2EPulsarFunctionLocalRun(null, 2));
+    }
+
+    private void testPulsarSourceLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sinkTopic = "persistent://" + replNamespace + "/output";
@@ -680,6 +717,8 @@ public class PulsarFunctionLocalRunTest {
         }
 
         sourceConfig.setArchive(jarFilePathUrl);
+        sourceConfig.setParallelism(parallelism);
+        int metricsPort = FunctionCommon.findAvailablePort();
         @Cleanup
         LocalRunner localRunner = LocalRunner.builder()
                 .sourceConfig(sourceConfig)
@@ -691,55 +730,84 @@ public class PulsarFunctionLocalRunTest {
                 .tlsHostNameVerificationEnabled(false)
                 .brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
                 .connectorsDirectory(workerConfig.getConnectorsDirectory())
+                .metricsPortStart(metricsPort)
                 .build();
 
         localRunner.start(false);
 
-        retryStrategically((test) -> {
+        Assert.assertTrue(retryStrategically((test) -> {
             try {
-                return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
+                return admin.topics().getStats(sinkTopic).publishers.size() == parallelism;
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 10, 150);
+        }, 10, 150));
 
-        retryStrategically((test) -> {
+        Assert.assertTrue(retryStrategically((test) -> {
             try {
+                boolean result = false;
                 TopicStats sourceStats = admin.topics().getStats(sinkTopic);
-                return sourceStats.publishers.size() == 1
-                        && sourceStats.publishers.get(0).metadata != null
-                        && sourceStats.publishers.get(0).metadata.containsKey("id")
-                        && sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+                if (sourceStats.publishers.size() == parallelism) {
+                    for (PublisherStats publisher : sourceStats.publishers) {
+                        result = publisher.metadata != null
+                                && publisher.metadata.containsKey("id")
+                                && publisher.metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+                    }
+                }
+
+                return result;
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 50, 150);
-
-        TopicStats sourceStats = admin.topics().getStats(sinkTopic);
-        assertEquals(sourceStats.publishers.size(), 1);
-        assertNotNull(sourceStats.publishers.get(0).metadata);
-        assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
-        assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+        }, 50, 150));
 
-        retryStrategically((test) -> {
+        Assert.assertTrue(retryStrategically((test) -> {
             try {
-                return (admin.topics().getStats(sinkTopic).publishers.size() == 1)
+                return (admin.topics().getStats(sinkTopic).publishers.size() == parallelism)
                         && (admin.topics().getInternalStats(sinkTopic, false).numberOfEntries > 4);
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 50, 150);
-        assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
+        }, 50, 150));
+        assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), parallelism);
+
+        // validate prometheus metrics
+        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
+        log.info("prometheus metrics: {}", prometheusMetrics);
+
+        Map<String, PulsarFunctionTestUtils.Metric> metricsMap = new HashMap<>();
+        Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> {
+            if (line.startsWith("pulsar_source_written_total")) {
+                Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
+                assertFalse(metrics.isEmpty());
+                PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_written_total");
+                if (m != null) {
+                    metricsMap.put(m.tags.get("instance_id"), m);
+                }
+            }
+        });
+        Assert.assertEquals(metricsMap.size(), parallelism);
+
+        for (int i = 0; i < parallelism; i++) {
+            PulsarFunctionTestUtils.Metric m = metricsMap.get(String.valueOf(i));
+            Assert.assertNotNull(m);
+            assertEquals(m.tags.get("cluster"), config.getClusterName());
+            assertEquals(m.tags.get("instance_id"), String.valueOf(i));
+            assertEquals(m.tags.get("name"), sourceName);
+            assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+            assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+            assertTrue(m.value > 0.0);
+        }
 
         localRunner.stop();
 
-        retryStrategically((test) -> {
+        Assert.assertTrue(retryStrategically((test) -> {
             try {
                 return (admin.topics().getStats(sinkTopic).publishers.size() == 0);
             } catch (PulsarAdminException e) {
                 return e.getStatusCode() == 404;
             }
-        }, 10, 150);
+        }, 10, 150));
 
         try {
             assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 0);
@@ -750,6 +818,10 @@ public class PulsarFunctionLocalRunTest {
         }
     }
 
+    private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
+        testPulsarSourceLocalRun(jarFilePathUrl, 1);
+    }
+
     @Test(timeOut = 20000, groups = "builtin")
     public void testPulsarSourceStatsBuiltin() throws Exception {
         testPulsarSourceLocalRun(String.format("%s://data-generator", Utils.BUILTIN));
@@ -771,7 +843,12 @@ public class PulsarFunctionLocalRunTest {
         testPulsarSourceLocalRun(fileServer.getUrl("/pulsar-io-data-generator.nar"));
     }
 
-    private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
+    @Test(timeOut = 40000)
+    public void testPulsarSourceLocalRunMultipleInstances() throws Throwable {
+        runWithNarClassLoader(() -> testPulsarSourceLocalRun(null, 2));
+    }
+
+    private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -794,6 +871,7 @@ public class PulsarFunctionLocalRunTest {
         }
 
         sinkConfig.setArchive(jarFilePathUrl);
+        sinkConfig.setParallelism(parallelism);
         int metricsPort = FunctionCommon.findAvailablePort();
         @Cleanup
         LocalRunner localRunner = LocalRunner.builder()
@@ -811,58 +889,70 @@ public class PulsarFunctionLocalRunTest {
 
         localRunner.start(false);
 
-        retryStrategically((test) -> {
+        Assert.assertTrue(retryStrategically((test) -> {
             try {
+                boolean result = false;
                 TopicStats topicStats = admin.topics().getStats(sourceTopic);
-
-                return topicStats.subscriptions.containsKey(subscriptionName)
-                        && topicStats.subscriptions.get(subscriptionName).consumers.size() == 1
-                        && topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 1000;
-
+                if (topicStats.subscriptions.containsKey(subscriptionName)
+                        && topicStats.subscriptions.get(subscriptionName).consumers.size() == parallelism) {
+                    for (ConsumerStats consumerStats : topicStats.subscriptions.get(subscriptionName).consumers) {
+                        result = consumerStats.availablePermits == 1000;
+                    }
+                }
+                return result;
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 20, 150);
-
-        TopicStats topicStats = admin.topics().getStats(sourceTopic);
-        assertEquals(topicStats.subscriptions.size(), 1);
-        assertTrue(topicStats.subscriptions.containsKey(subscriptionName));
-        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
-        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 1000);
+        }, 20, 150));
 
         int totalMsgs = 10;
         for (int i = 0; i < totalMsgs; i++) {
             String data = "my-message-" + i;
             producer.newMessage().property(propertyKey, propertyValue).value(data).send();
         }
-        retryStrategically((test) -> {
+        Assert.assertTrue(retryStrategically((test) -> {
             try {
                 SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
                 return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs;
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 5, 200);
+        }, 5, 200));
 
         // validate prometheus metrics
         String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
         log.info("prometheus metrics: {}", prometheusMetrics);
 
-        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
-        assertFalse(metrics.isEmpty());
-
-        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total");
-        assertEquals(m.tags.get("cluster"), config.getClusterName());
-        assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), sinkName);
-        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
-        assertEquals(m.value, 10.0);
+        Map<String, PulsarFunctionTestUtils.Metric> metricsMap = new HashMap<>();
+        Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> {
+            if (line.startsWith("pulsar_sink_written_total")) {
+                Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
+                assertFalse(metrics.isEmpty());
+                PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total");
+                if (m != null) {
+                    metricsMap.put(m.tags.get("instance_id"), m);
+                }
+            }
+        });
+        Assert.assertEquals(metricsMap.size(), parallelism);
+
+        double totalNumRecvMsg = 0;
+        for (int i = 0; i < parallelism; i++) {
+            PulsarFunctionTestUtils.Metric m = metricsMap.get(String.valueOf(i));
+            Assert.assertNotNull(m);
+            assertEquals(m.tags.get("cluster"), config.getClusterName());
+            assertEquals(m.tags.get("instance_id"), String.valueOf(i));
+            assertEquals(m.tags.get("name"), sinkName);
+            assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+            assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
+            totalNumRecvMsg += m.value;
+        }
+        assertEquals(totalNumRecvMsg, totalMsgs);
 
         // stop sink
         localRunner.stop();
 
-        retryStrategically((test) -> {
+        Assert.assertTrue(retryStrategically((test) -> {
             try {
                 TopicStats stats = admin.topics().getStats(sourceTopic);
                 return stats.subscriptions.get(subscriptionName) != null
@@ -870,14 +960,18 @@ public class PulsarFunctionLocalRunTest {
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 20, 150);
+        }, 20, 150));
 
-        topicStats = admin.topics().getStats(sourceTopic);
+        TopicStats topicStats = admin.topics().getStats(sourceTopic);
         assertTrue(topicStats.subscriptions.get(subscriptionName) != null
                 && topicStats.subscriptions.get(subscriptionName).consumers.isEmpty());
 
     }
 
+    private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
+        testPulsarSourceLocalRun(jarFilePathUrl, 1);
+    }
+
     @Test(timeOut = 20000, groups = "builtin")
     public void testPulsarSinkStatsBuiltin() throws Exception {
         testPulsarSinkLocalRun(String.format("%s://data-generator", Utils.BUILTIN));
@@ -888,6 +982,22 @@ public class PulsarFunctionLocalRunTest {
         runWithNarClassLoader(() -> testPulsarSinkLocalRun(null));
     }
 
+    @Test(timeOut = 20000)
+    public void testPulsarSinkStatsWithFile() throws Exception {
+        String jarFilePathUrl = getPulsarIODataGeneratorNar().toURI().toString();
+        testPulsarSinkLocalRun(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 40000)
+    public void testPulsarSinkStatsWithUrl() throws Exception {
+        testPulsarSinkLocalRun(fileServer.getUrl("/pulsar-io-data-generator.nar"));
+    }
+
+    @Test(timeOut = 40000)
+    public void testPulsarSinkStatsMultipleInstances() throws Throwable {
+        runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 2));
+    }
+
     private void runWithNarClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable {
         ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
         try (NarClassLoader classLoader = NarClassLoader.getFromArchive(getPulsarIODataGeneratorNar(), Collections.emptySet(), originalClassLoader, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR)) {
@@ -909,15 +1019,4 @@ public class PulsarFunctionLocalRunTest {
             Thread.currentThread().setContextClassLoader(originalClassLoader);
         }
     }
-
-    @Test(timeOut = 20000)
-    public void testPulsarSinkStatsWithFile() throws Exception {
-        String jarFilePathUrl = getPulsarIODataGeneratorNar().toURI().toString();
-        testPulsarSinkLocalRun(jarFilePathUrl);
-    }
-
-    @Test(timeOut = 40000)
-    public void testPulsarSinkStatsWithUrl() throws Exception {
-        testPulsarSinkLocalRun(fileServer.getUrl("/pulsar-io-data-generator.nar"));
-    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
index 8f5316c..46aa37d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
@@ -106,7 +106,6 @@ public class PulsarFunctionTestUtils {
             parsed.put(name, m);
         });
 
-        log.info("parsed metrics: {}", parsed);
         return parsed;
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index e927332..357bbcf 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.functions.api.StateStore;
 import org.apache.pulsar.functions.instance.state.DefaultStateStore;
 import org.apache.pulsar.functions.instance.state.StateManager;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
 import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
 import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
@@ -115,7 +116,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
     private final Function.FunctionDetails.ComponentType componentType;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
-                       SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
+                       SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels,
                        Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
                        StateManager stateManager, PulsarAdmin pulsarAdmin) {
         this.config = config;
@@ -172,15 +173,17 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
             default:
                 throw new RuntimeException("Unknown component type: " + componentType);
         }
-        this.userMetricsSummary = Summary.build()
-                .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
-                .help("User defined metric.")
-                .labelNames(userMetricsLabelNames)
-                .quantile(0.5, 0.01)
-                .quantile(0.9, 0.01)
-                .quantile(0.99, 0.01)
-                .quantile(0.999, 0.01)
-                .register(collectorRegistry);
+        this.userMetricsSummary = collectorRegistry.registerIfNotExist(
+                prefix + ComponentStatsManager.USER_METRIC_PREFIX,
+                Summary.build()
+                        .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
+                        .help("User defined metric.")
+                        .labelNames(userMetricsLabelNames)
+                        .quantile(0.5, 0.01)
+                        .quantile(0.9, 0.01)
+                        .quantile(0.99, 0.01)
+                        .quantile(0.999, 0.01)
+                        .create());
         this.componentType = componentType;
         this.stateManager = stateManager;
         this.defaultStateStore = (DefaultStateStore) stateManager.getStore(
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d761ccb..36f8608 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.functions.instance.state.StateManager;
 import org.apache.pulsar.functions.instance.state.StateStoreContextImpl;
 import org.apache.pulsar.functions.instance.state.StateStoreProvider;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -106,7 +107,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     private final SecretsProvider secretsProvider;
 
-    private CollectorRegistry collectorRegistry;
+    private FunctionCollectorRegistry collectorRegistry;
     private final String[] metricsLabels;
 
     private InstanceCache instanceCache;
@@ -130,14 +131,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                                 PulsarAdmin pulsarAdmin,
                                 String stateStorageServiceUrl,
                                 SecretsProvider secretsProvider,
-                                CollectorRegistry collectorRegistry,
+                                FunctionCollectorRegistry collectorRegistry,
                                 ClassLoader functionClassLoader) {
         this.instanceConfig = instanceConfig;
         this.client = (PulsarClientImpl) pulsarClient;
         this.pulsarAdmin = pulsarAdmin;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.secretsProvider = secretsProvider;
-        this.collectorRegistry = collectorRegistry;
         this.functionClassLoader = functionClassLoader;
         this.metricsLabels = new String[]{
                 instanceConfig.getFunctionDetails().getTenant(),
@@ -171,7 +171,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         this.instanceCache = InstanceCache.getInstanceCache();
 
         if (this.collectorRegistry == null) {
-            this.collectorRegistry = new CollectorRegistry();
+            this.collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
         }
         this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
                 this.instanceCache.getScheduledExecutorService(),
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index cbdcc0f..d822aa0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -38,7 +38,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
 
     protected ScheduledFuture<?> scheduledFuture;
 
-    protected final CollectorRegistry collectorRegistry;
+    protected final FunctionCollectorRegistry collectorRegistry;
 
     protected final EvictingQueue EMPTY_QUEUE = EvictingQueue.create(0);
 
@@ -53,7 +53,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
         exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
     }
 
-    public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
+    public static ComponentStatsManager getStatsManager(FunctionCollectorRegistry collectorRegistry,
                                   String[] metricsLabels,
                                   ScheduledExecutorService scheduledExecutorService,
                                   Function.FunctionDetails.ComponentType componentType) {
@@ -69,7 +69,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
         }
     }
 
-    public ComponentStatsManager(CollectorRegistry collectorRegistry,
+    public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry,
                          String[] metricsLabels,
                          ScheduledExecutorService scheduledExecutorService) {
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistry.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistry.java
new file mode 100644
index 0000000..d297639
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistry.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance.stats;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+
+/**
+ * Internal representation of Prometheus Collector Registry
+ */
+public abstract class FunctionCollectorRegistry extends CollectorRegistry {
+    public static FunctionCollectorRegistry getDefaultImplementation() {
+        return new FunctionCollectorRegistryImpl();
+    }
+
+    /**
+     * Register a metric if it does not yet exist.  If it does exist, then return the existing metric.
+     * Currently, only needed by the LocalRunner when running in threaded and exposing metrics via a http server.
+     * This method helps resolve the conflict in which multiple instances within the LocalRunner process try to register the same metric.
+     * @param metricName the name of the metric
+     * @param collector the metric object e.g. Count, Gauge, etc.
+     * @param <T>
+     * @return If the metric with the name `metricName` already exists, return the existing metric object.  If not, return null
+     */
+    public abstract <T extends Collector> T registerIfNotExist(String metricName, T collector);
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistryImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistryImpl.java
new file mode 100644
index 0000000..9f003a4
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistryImpl.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance.stats;
+
+import io.prometheus.client.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FunctionCollectorRegistryImpl extends FunctionCollectorRegistry {
+
+    private final Map<String, Collector> namesToCollectors = new HashMap<String, Collector>();
+
+    public Collector registerIfNotExist(String metricName, Collector collector) {
+        synchronized (this) {
+            Collector existingCollector = namesToCollectors.get(metricName);
+            if (existingCollector == null) {
+                namesToCollectors.put(metricName, collector);
+                super.register(collector);
+                return collector;
+            }
+            return existingCollector;
+        }
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index f02b850..08ea9ea 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -119,33 +119,41 @@ public class FunctionStatsManager extends ComponentStatsManager{
 
     private final RateLimiter sysExceptionRateLimiter;
 
-    public FunctionStatsManager(CollectorRegistry collectorRegistry,
+    public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry,
                                 String[] metricsLabels,
                                 ScheduledExecutorService scheduledExecutorService) {
         super(collectorRegistry, metricsLabels, scheduledExecutorService);
 
-        statTotalProcessedSuccessfully = Counter.build()
+        statTotalProcessedSuccessfully = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL,
+                Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
                 .help("Total number of messages processed successfully.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.labels(metricsLabels);
 
-        statTotalSysExceptions = Counter.build()
+        statTotalSysExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL ,
+                Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
                 .help("Total number of system exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
 
-        statTotalUserExceptions = Counter.build()
+        statTotalUserExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL,
+                Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
                 .help("Total number of user exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalUserExceptions = statTotalUserExceptions.labels(metricsLabels);
 
-        statProcessLatency = Summary.build()
+        statProcessLatency = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS,
+                Summary.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
                 .help("Process latency in milliseconds.")
                 .quantile(0.5, 0.01)
@@ -153,45 +161,57 @@ public class FunctionStatsManager extends ComponentStatsManager{
                 .quantile(0.99, 0.01)
                 .quantile(0.999, 0.01)
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statProcessLatency = statProcessLatency.labels(metricsLabels);
 
-        statlastInvocation = Gauge.build()
+        statlastInvocation = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION,
+                Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
                 .help("The timestamp of the last invocation of the function.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statlastInvocation = statlastInvocation.labels(metricsLabels);
 
-        statTotalRecordsReceived = Counter.build()
+        statTotalRecordsReceived = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL,
+                Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
                 .help("Total number of messages received from source.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels);
 
-        statTotalProcessedSuccessfully1min = Counter.build()
+        statTotalProcessedSuccessfully1min = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
                 .help("Total number of messages processed successfully in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels);
 
-        statTotalSysExceptions1min = Counter.build()
+        statTotalSysExceptions1min = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of system exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
 
-        statTotalUserExceptions1min = Counter.build()
+        statTotalUserExceptions1min = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of user exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels);
 
-        statProcessLatency1min = Summary.build()
+        statProcessLatency1min = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min,
+                Summary.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
                 .help("Process latency in milliseconds in the last 1 minute.")
                 .quantile(0.5, 0.01)
@@ -199,38 +219,48 @@ public class FunctionStatsManager extends ComponentStatsManager{
                 .quantile(0.99, 0.01)
                 .quantile(0.999, 0.01)
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
 
-        statTotalRecordsReceived1min = Counter.build()
+        statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
                 .help("Total number of messages received from source in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels);
 
-        userExceptions = Gauge.build()
+        userExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + "user_exception",
+                Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
                 .labelNames(exceptionMetricsLabelNames)
                 .help("Exception from user code.")
-                .register(collectorRegistry);
-        sysExceptions = Gauge.build()
+                .create());
+        sysExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + "system_exception",
+                Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
                 .labelNames(exceptionMetricsLabelNames)
                 .help("Exception from system code.")
-                .register(collectorRegistry);
+                .create());
 
-        sourceExceptions = Gauge.build()
+        sourceExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + "source_exception",
+                Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + "source_exception")
                 .labelNames(exceptionMetricsLabelNames)
                 .help("Exception from source.")
-                .register(collectorRegistry);
+                .create());
 
-        sinkExceptions = Gauge.build()
+        sinkExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception",
+                Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception")
                 .labelNames(exceptionMetricsLabelNames)
                 .help("Exception from sink.")
-                .register(collectorRegistry);
+                .create());
 
         userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
         sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index 401aa34..46999cd 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -95,84 +95,106 @@ public class SinkStatsManager extends ComponentStatsManager {
     private final RateLimiter sinkExceptionRateLimiter;
 
 
-    public SinkStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService
+    public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService
             scheduledExecutorService) {
         super(collectorRegistry, metricsLabels, scheduledExecutorService);
 
-        statTotalRecordsReceived = Counter.build()
+        statTotalRecordsReceived = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL,
+                Counter.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL)
                 .help("Total number of records sink has received from Pulsar topic(s).")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels);
 
-        statTotalSysExceptions = Counter.build()
+        statTotalSysExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL,
+                Counter.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
                 .help("Total number of system exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
 
-        statTotalSinkExceptions = Counter.build()
+        statTotalSinkExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL,
+                Counter.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
                 .help("Total number of sink exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSinkExceptions = statTotalSinkExceptions.labels(metricsLabels);
 
-        statTotalWritten = Counter.build()
+        statTotalWritten = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL,
+                Counter.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL)
                 .help("Total number of records processed by sink.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalWritten = statTotalWritten.labels(metricsLabels);
 
-        statlastInvocation = Gauge.build()
+        statlastInvocation = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION,
+                Gauge.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION)
                 .help("The timestamp of the last invocation of the sink.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statlastInvocation = statlastInvocation.labels(metricsLabels);
 
-        statTotalRecordsReceived1min = Counter.build()
+        statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min)
                 .help("Total number of messages sink has received from Pulsar topic(s) in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels);
 
-        statTotalSysExceptions1min = Counter.build()
+        statTotalSysExceptions1min = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of system exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
 
-        statTotalSinkExceptions1min = Counter.build()
+        statTotalSinkExceptions1min = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of sink exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels);
 
-        statTotalWritten1min = Counter.build()
+        statTotalWritten1min = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min)
                 .help("Total number of records processed by sink the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
 
-        sysExceptions = Gauge.build()
+        sysExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + "system_exception",
+                Gauge.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + "system_exception")
                 .labelNames(exceptionMetricsLabelNames)
                 .help("Exception from system code.")
-                .register(collectorRegistry);
+                .create());
 
-        sinkExceptions = Gauge.build()
+        sinkExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_SINK_METRICS_PREFIX + "sink_exception",
+                Gauge.build()
                 .name(PULSAR_SINK_METRICS_PREFIX + "sink_exception")
                 .labelNames(exceptionMetricsLabelNames)
                 .help("Exception from sink.")
-                .register(collectorRegistry);
+                .create());
 
         sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
         sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 287240c..e79e0b5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -94,84 +94,106 @@ public class SourceStatsManager extends ComponentStatsManager {
 
     protected final RateLimiter sourceExceptionRateLimiter;
 
-    public SourceStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService
+    public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService
             scheduledExecutorService) {
         super(collectorRegistry, metricsLabels, scheduledExecutorService);
 
-        statTotalRecordsReceived = Counter.build()
+        statTotalRecordsReceived = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL,
+                Counter.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL)
                 .help("Total number of records received from source.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels);
 
-        statTotalSysExceptions = Counter.build()
+        statTotalSysExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL,
+                Counter.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
                 .help("Total number of system exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
 
-        statTotalSourceExceptions = Counter.build()
+        statTotalSourceExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL,
+                Counter.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
                 .help("Total number of source exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSourceExceptions = statTotalSourceExceptions.labels(metricsLabels);
 
-        statTotalWritten = Counter.build()
+        statTotalWritten = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL,
+                Counter.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL)
                 .help("Total number of records written to a Pulsar topic.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalWritten = statTotalWritten.labels(metricsLabels);
 
-        statlastInvocation = Gauge.build()
+        statlastInvocation = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION,
+                Gauge.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION)
                 .help("The timestamp of the last invocation of the source.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statlastInvocation = statlastInvocation.labels(metricsLabels);
 
-        statTotalRecordsReceived1min = Counter.build()
+        statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min)
                 .help("Total number of records received from source in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels);
 
-        statTotalSysExceptions1min = Counter.build()
+        statTotalSysExceptions1min = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of system exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
 
-        statTotalSourceExceptions1min = Counter.build()
+        statTotalSourceExceptions1min = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of source exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels);
 
-        statTotalWritten1min = Counter.build()
+        statTotalWritten1min = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min,
+                Counter.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min)
                 .help("Total number of records written to a Pulsar topic in the last 1 minute.")
                 .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
+                .create());
         _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
 
-        sysExceptions = Gauge.build()
+        sysExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + "system_exception",
+                Gauge.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + "system_exception")
                 .labelNames(exceptionMetricsLabelNames)
                 .help("Exception from system code.")
-                .register(collectorRegistry);
+                .create());
 
-        sourceExceptions = Gauge.build()
+        sourceExceptions = collectorRegistry.registerIfNotExist(
+                PULSAR_SOURCE_METRICS_PREFIX + "source_exception",
+                Gauge.build()
                 .name(PULSAR_SOURCE_METRICS_PREFIX + "source_exception")
                 .labelNames(exceptionMetricsLabelNames)
                 .help("Exception from source.")
-                .register(collectorRegistry);
+                .create());
 
         sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
         sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index ceb87c3..8bc8a54 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.BKStateStoreImpl;
 import org.apache.pulsar.functions.instance.state.InstanceStateManager;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
 import org.slf4j.Logger;
@@ -92,7 +93,7 @@ public class ContextImplTest {
             config,
             logger,
             client,
-            new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
+            new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
                 pulsarAdmin);
         context.setCurrentMessageContext((Record<String>) () -> null);
@@ -182,7 +183,7 @@ public class ContextImplTest {
                 config,
                 logger,
                 client,
-                new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
+                new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
                 pulsarAdmin);
         context.getPulsarAdmin();
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index f599139..c7e0c28 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.nar.FileUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.runtime.RuntimeUtils;
@@ -171,7 +172,7 @@ public class LocalRunner implements AutoCloseable {
     protected String secretsProviderClassName;
     @Parameter(names = "--secretsProviderConfig", description = "Whats the config for the secrets provider", hidden = true)
     protected String secretsProviderConfig;
-    @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server", hidden = true)
+    @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server. When running instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
     protected Integer metricsPortStart;
 
     private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
@@ -485,7 +486,16 @@ public class LocalRunner implements AutoCloseable {
                 instanceConfig.setInstanceId(i + instanceIdOffset);
                 instanceConfig.setMaxBufferedTuples(1024);
                 instanceConfig.setPort(FunctionCommon.findAvailablePort());
-                instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+
+                if (metricsPortStart != null) {
+                    int metricsPort = metricsPortStart + i;
+                    if (metricsPortStart < 0 || metricsPortStart > 65535) {
+                        throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
+                    }
+                    instanceConfig.setMetricsPort(metricsPort);
+                } else {
+                    instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+                }
                 instanceConfig.setClusterName("local");
                 if (functionConfig != null) {
                     instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
@@ -537,6 +547,13 @@ public class LocalRunner implements AutoCloseable {
                                            int parallelism, int instanceIdOffset, String serviceUrl,
                                            String stateStorageServiceUrl, AuthenticationConfig authConfig,
                                            String userCodeFile) throws Exception {
+
+        if (metricsPortStart != null) {
+            if (metricsPortStart < 0 || metricsPortStart > 65535) {
+                throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
+            }
+        }
+
         SecretsProvider secretsProvider;
         if (secretsProviderClassName != null) {
             secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
@@ -554,7 +571,7 @@ public class LocalRunner implements AutoCloseable {
         }
 
         // Collector Registry for prometheus metrics
-        CollectorRegistry collectorRegistry = new CollectorRegistry();
+        FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
         RuntimeUtils.registerDefaultCollectors(collectorRegistry);
 
         ThreadRuntimeFactory threadRuntimeFactory;
@@ -583,10 +600,7 @@ public class LocalRunner implements AutoCloseable {
             instanceConfig.setInstanceId(i + instanceIdOffset);
             instanceConfig.setMaxBufferedTuples(1024);
             if (metricsPortStart != null) {
-                if (metricsPortStart < 0 || metricsPortStart > 65535) {
-                    throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
-                }
-                instanceConfig.setMetricsPort(metricsPortStart + i);
+                instanceConfig.setMetricsPort(metricsPortStart);
             }
             instanceConfig.setClusterName("local");
             if (functionConfig != null) {
@@ -604,13 +618,12 @@ public class LocalRunner implements AutoCloseable {
                     30000);
             spawners.add(runtimeSpawner);
             runtimeSpawner.start();
+        }
 
-            if (metricsPortStart != null) {
-                // starting metrics server
-                log.info("Starting metrics server on port {}", instanceConfig.getMetricsPort());
-                new HTTPServer(new InetSocketAddress(instanceConfig.getMetricsPort()), collectorRegistry, true);
-            }
-
+        if (metricsPortStart != null) {
+            // starting metrics server
+            log.info("Starting metrics server on port {}", metricsPortStart);
+            new HTTPServer(new InetSocketAddress(metricsPortStart), collectorRegistry, true);
         }
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 0de9782..170e606 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
@@ -191,7 +192,7 @@ public class JavaInstanceStarter implements AutoCloseable {
         secretsProvider.init(secretsProviderConfigMap);
 
         // Collector Registry for prometheus metrics
-        CollectorRegistry collectorRegistry = new CollectorRegistry();
+        FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
         RuntimeUtils.registerDefaultCollectors(collectorRegistry);
 
         containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index c62a6bf..8c359a9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 
@@ -475,7 +476,7 @@ public class RuntimeUtils {
         return ObjectMapperFactory.getThreadLocal().convertValue(configMap, functionRuntimeConfigClass);
     }
 
-    public static void registerDefaultCollectors(CollectorRegistry registry) {
+    public static void registerDefaultCollectors(FunctionCollectorRegistry registry) {
         // Add the JMX exporter for functionality similar to the kafka connect JMX metrics
         try {
             new JmxCollector("{}").register(registry);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index ec9343e..ad4d400 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -65,7 +66,7 @@ public class ThreadRuntime implements Runtime {
     private PulsarAdmin pulsarAdmin;
     private String stateStorageServiceUrl;
     private SecretsProvider secretsProvider;
-    private CollectorRegistry collectorRegistry;
+    private FunctionCollectorRegistry collectorRegistry;
     private String narExtractionDirectory;
     private final Optional<ConnectorsManager> connectorsManager;
 
@@ -77,7 +78,7 @@ public class ThreadRuntime implements Runtime {
                   PulsarAdmin pulsarAdmin,
                   String stateStorageServiceUrl,
                   SecretsProvider secretsProvider,
-                  CollectorRegistry collectorRegistry,
+                  FunctionCollectorRegistry collectorRegistry,
                   String narExtractionDirectory,
                   Optional<ConnectorsManager> connectorsManager) {
         this.instanceConfig = instanceConfig;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 1be328b..d1f450c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeUtils;
@@ -60,7 +61,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     private PulsarAdmin pulsarAdmin;
     private String storageServiceUrl;
     private SecretsProvider defaultSecretsProvider;
-    private CollectorRegistry collectorRegistry;
+    private FunctionCollectorRegistry collectorRegistry;
     private String narExtractionDirectory;
     private volatile boolean closed;
     private SecretsProviderConfigurator secretsProviderConfigurator;
@@ -73,7 +74,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
      */
     public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
                                 AuthenticationConfig authConfig, SecretsProvider secretsProvider,
-                                CollectorRegistry collectorRegistry, String narExtractionDirectory,
+                                FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
                                 ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
                                 String pulsarWebServiceUrl) throws Exception {
         initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig,
@@ -83,7 +84,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
 
     private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, String pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl,
                             SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider,
-                            CollectorRegistry collectorRegistry, String narExtractionDirectory,
+                            FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
                             ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
                             String pulsarWebServiceUrl, Optional<ConnectorsManager> connectorsManager) throws PulsarClientException {