You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/15 22:18:20 UTC

[GitHub] merlimat closed pull request #1373: Functions metrics prometheus

merlimat closed pull request #1373: Functions metrics prometheus
URL: https://github.com/apache/incubator-pulsar/pull/1373
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index daaeae9a9..ff9d2311b 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -28,12 +28,6 @@ pulsarServiceUrl: pulsar://localhost:6650
 pulsarWebServiceUrl: http://localhost:8080
 numFunctionPackageReplicas: 1
 downloadDirectory: /tmp/pulsar_functions
-metricsConfig:
-  metricsSinkClassName: org.apache.pulsar.functions.metrics.sink.PrometheusSink
-  metricsCollectionInterval: 30
-  metricsSinkConfig:
-    path: /metrics
-    port: 9099
 #threadContainerFactory:
 #  threadGroupName: "Thread Function Container Group"
 processContainerFactory:
@@ -45,3 +39,4 @@ failureCheckFreqMs: 30000
 rescheduleTimeoutMs: 60000
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
+instanceLivenessCheckFreqMs: 30000
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 18693dae8..22115ea60 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -23,7 +23,7 @@
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 import io.netty.util.concurrent.FastThreadLocal;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index e7bcda1b0..167ec1c61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -24,7 +24,7 @@
 
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -36,6 +36,7 @@
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
 import io.prometheus.client.hotspot.DefaultExports;
+import org.apache.pulsar.functions.worker.FunctionsStatsGenerator;
 
 /**
  * Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out
@@ -71,6 +72,9 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, O
 
             NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, stream);
 
+            FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
+                    pulsar.getConfiguration().getClusterName(), stream);
+
             out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
         } finally {
             buf.release();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 0dc8749bc..b924177c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -60,7 +60,6 @@ public void init() throws ServletException {
     protected void doGet(HttpServletRequest request, HttpServletResponse response)
             throws ServletException, IOException {
         AsyncContext context = request.startAsync();
-
         executor.execute(safeRun(() -> {
             HttpServletResponse res = (HttpServletResponse) context.getResponse();
             try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index a750b7558..f5938b613 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -20,7 +20,7 @@
 
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java
index 7bdc66b2c..90d5c3e16 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java
@@ -21,6 +21,7 @@
 import java.util.Stack;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 public class StatsOutputStream extends SimpleTextOutputStream {
     private final Stack<Boolean> separators = new Stack<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java
index 57a5b643d..030ff3037 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java
@@ -22,8 +22,7 @@
 
 import java.nio.charset.Charset;
 
-import org.apache.pulsar.utils.SimpleTextOutputStream;
-import org.apache.pulsar.utils.StatsOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c8010e778..cb39a3f4d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -478,8 +478,7 @@ void runCmd() throws Exception {
                             instanceConfig,
                             userCodeFile,
                             containerFactory,
-                            null,
-                            0);
+                            null);
                     spawners.add(runtimeSpawner);
                     runtimeSpawner.start();
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
similarity index 98%
rename from pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
index 0db4adbba..eaf3664f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.utils;
+package org.apache.pulsar.common.util;
 
 import io.netty.buffer.ByteBuf;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
similarity index 94%
rename from pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index 091c529c6..3f75fde11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.utils;
+package org.apache.pulsar.common.util;
 
 import io.netty.buffer.ByteBuf;
 
@@ -99,17 +99,17 @@ public SimpleTextOutputStream writeEncoded(String s) {
         return this;
     }
 
-    SimpleTextOutputStream write(boolean b) {
+    public SimpleTextOutputStream write(boolean b) {
         write(b ? "true" : "false");
         return this;
     }
 
-    SimpleTextOutputStream write(long n) {
+    public SimpleTextOutputStream write(long n) {
         NumberFormat.format(this.buffer, n);
         return this;
     }
 
-    SimpleTextOutputStream write(double d) {
+    public SimpleTextOutputStream write(double d) {
         long i = (long) d;
         write(i);
 
@@ -131,5 +131,4 @@ SimpleTextOutputStream write(double d) {
         write(r);
         return this;
     }
-
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index badee9461..627ccd4b0 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -36,6 +36,7 @@
 
 import java.lang.reflect.Type;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 /**
  * A function container implemented using java thread.
@@ -166,8 +167,7 @@ public void start() throws Exception {
                 instanceConfig,
                 jarFile,
                 containerFactory,
-                null,
-                0);
+                null);
 
         server = ServerBuilder.forPort(port)
                 .addService(new InstanceControlImpl(runtimeSpawner))
@@ -221,5 +221,22 @@ public void getFunctionStatus(Empty request, StreamObserver<InstanceCommunicatio
                 throw new RuntimeException(e);
             }
         }
+
+        @Override
+        public void getAndResetMetrics(com.google.protobuf.Empty request,
+                                       io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
+            Runtime runtime = runtimeSpawner.getRuntime();
+            if (runtime != null) {
+                try {
+                    InstanceCommunication.MetricsData metrics = runtime.getAndResetMetrics().get();
+                    responseObserver.onNext(metrics);
+                    responseObserver.onCompleted();
+                } catch (InterruptedException | ExecutionException e) {
+                    log.error("Exception in JavaInstance doing getAndResetMetrics", e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
     }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index d6c203a35..6bcb988e1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -31,10 +31,6 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.runtime.RuntimeFactory;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.metrics.MetricsSink;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 
 @Slf4j
 public class RuntimeSpawner implements AutoCloseable {
@@ -45,22 +41,19 @@
 
     @Getter
     private Runtime runtime;
-    private MetricsSink metricsSink;
-    private int metricsCollectionInterval;
-    private Timer metricsCollectionTimer;
+    private Timer processLivenessCheckTimer;
     private int numRestarts;
+    private Long instanceLivenessCheckFreqMs;
+
 
     public RuntimeSpawner(InstanceConfig instanceConfig,
                           String codeFile,
-                          RuntimeFactory containerFactory,
-                          MetricsSink metricsSink,
-                          int metricsCollectionInterval) {
+                          RuntimeFactory containerFactory, Long instanceLivenessCheckFreqMs) {
         this.instanceConfig = instanceConfig;
         this.runtimeFactory = containerFactory;
         this.codeFile = codeFile;
-        this.metricsSink = metricsSink;
-        this.metricsCollectionInterval = metricsCollectionInterval;
         this.numRestarts = 0;
+        this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
     }
 
     public void start() throws Exception {
@@ -68,34 +61,21 @@ public void start() throws Exception {
                 this.instanceConfig.getInstanceId());
         runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile);
         runtime.start();
-        if (metricsSink != null) {
-            log.info("Scheduling Metrics Collection every {} secs for {} - {}",
-                    metricsCollectionInterval,
-                    FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
-                    instanceConfig.getInstanceId());
-            metricsCollectionTimer = new Timer();
-            metricsCollectionTimer.scheduleAtFixedRate(new TimerTask() {
+
+        // monitor function runtime to make sure it is running.  If not, restart the function runtime
+        if (instanceLivenessCheckFreqMs != null) {
+            processLivenessCheckTimer = new Timer();
+            processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
                 @Override
                 public void run() {
-                    if (runtime.isAlive()) {
-
-                        log.info("Collecting metrics for function {} - {}",
-                                FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
-                                instanceConfig.getInstanceId());
-                        runtime.getAndResetMetrics().thenAccept(t -> {
-                            if (t != null) {
-                                log.debug("Collected metrics {}", t);
-                                metricsSink.processRecord(t, instanceConfig.getFunctionConfig());
-                            }
-                        });
-                    } else {
+                    if (!runtime.isAlive()) {
                         log.error("Function Container is dead with exception", runtime.getDeathException());
                         log.error("Restarting...");
                         runtime.start();
                         numRestarts++;
                     }
                 }
-            }, metricsCollectionInterval * 1000, metricsCollectionInterval * 1000);
+            }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs);
         }
     }
 
@@ -122,9 +102,9 @@ public void close() {
             runtime.stop();
             runtime = null;
         }
-        if (metricsCollectionTimer != null) {
-            metricsCollectionTimer.cancel();
-            metricsCollectionTimer = null;
+        if (processLivenessCheckTimer != null) {
+            processLivenessCheckTimer.cancel();
+            processLivenessCheckTimer = null;
         }
     }
 }
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 3dab594f6..a4e9f034c 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -118,6 +118,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.glassfish.jersey.media</groupId>
       <artifactId>jersey-media-json-jackson</artifactId>
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index cc77e47ea..aa3477907 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -29,7 +29,6 @@
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.metrics.MetricsSink;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 
@@ -49,8 +48,6 @@
 
     private final WorkerConfig workerConfig;
     private final RuntimeFactory runtimeFactory;
-    private final MetricsSink metricsSink;
-    private final int metricsCollectionInterval;
     private final Namespace dlogNamespace;
     private LinkedBlockingQueue<FunctionAction> actionQueue;
     private volatile boolean running;
@@ -58,14 +55,10 @@
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
-                            MetricsSink metricsSink,
-                            int metricCollectionInterval,
                             Namespace dlogNamespace,
                             LinkedBlockingQueue<FunctionAction> actionQueue) {
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
-        this.metricsSink = metricsSink;
-        this.metricsCollectionInterval = metricCollectionInterval;
         this.dlogNamespace = dlogNamespace;
         this.actionQueue = actionQueue;
         actioner = new Thread(() -> {
@@ -171,8 +164,8 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep
         instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
         instanceConfig.setInstanceId(String.valueOf(instanceId));
         instanceConfig.setMaxBufferedTuples(1024);
-        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), runtimeFactory,
-                metricsSink, metricsCollectionInterval);
+        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
+                runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
 
         functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
         runtimeSpawner.start();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 1d65078a1..a20c782e0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -31,7 +31,6 @@
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
-import org.apache.pulsar.functions.metrics.MetricsSink;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 
 import javax.ws.rs.client.Client;
@@ -73,8 +72,6 @@
 
     private final FunctionAssignmentTailer functionAssignmentTailer;
 
-    private MetricsSink metricsSink;
-
     private FunctionActioner functionActioner;
 
     private RuntimeFactory runtimeFactory;
@@ -110,12 +107,9 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
             throw new RuntimeException("Either Thread or Process Container Factory need to be set");
         }
 
-        this.metricsSink = createMetricsSink();
-
         this.actionQueue = new LinkedBlockingQueue<>();
 
         this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
-                this.metricsSink, this.workerConfig.getMetricsConfig().getMetricsCollectionInterval(),
                 dlogNamespace, actionQueue);
 
         this.membershipManager = membershipManager;
@@ -127,7 +121,6 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
     public void start() {
         log.info("/** Starting Function Runtime Manager **/");
         log.info("Initialize metrics sink...");
-        this.metricsSink.init(this.workerConfig.getMetricsConfig().getMetricsSinkConfig());
         log.info("Starting function actioner...");
         this.functionActioner.start();
         log.info("Starting function assignment tailer...");
@@ -428,6 +421,9 @@ public synchronized void processAssignmentUpdate(MessageId messageId, Assignment
         }
     }
 
+    public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
+        return this.functionRuntimeInfoMap;
+    }
     /**
      * Private methods for internal use.  Should not be used outside of this class
      */
@@ -517,20 +513,6 @@ public void close() throws Exception {
         this.functionAssignmentTailer.close();
     }
 
-    private MetricsSink createMetricsSink() {
-        String className = workerConfig.getMetricsConfig().getMetricsSinkClassName();
-        try {
-            MetricsSink sink = (MetricsSink) Class.forName(className).newInstance();
-            return sink;
-        } catch (InstantiationException e) {
-            throw new RuntimeException(e + " IMetricsSink class must have a no-arg constructor.");
-        } catch (IllegalAccessException e) {
-            throw new RuntimeException(e + " IMetricsSink class must be concrete.");
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e + " IMetricsSink class must be a class path.");
-        }
-    }
-
     private Map<String, Assignment> diff(Map<String, Assignment> assignmentMap1, Map<String, Assignment> assignmentMap2) {
         Map<String, Assignment> result = new HashMap<>();
         for (Map.Entry<String, Assignment> entry : assignmentMap1.entrySet()) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
new file mode 100644
index 000000000..c9fd53987
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A class to generate stats for pulsar functions running on this broker
+ */
+public class FunctionsStatsGenerator {
+
+    private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class);
+
+    public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
+        if (workerService != null) {
+            Map<String, FunctionRuntimeInfo> functionRuntimes
+                    = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
+
+            for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
+                String fullyQualifiedInstanceName = entry.getKey();
+                FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+                RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
+
+                if (functionRuntimeSpawner != null) {
+                    Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+                    if (functionRuntime != null) {
+                        try {
+                            InstanceCommunication.MetricsData metrics = functionRuntime.getAndResetMetrics().get();
+                            for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest> metricsEntry
+                                    : metrics.getMetricsMap().entrySet()) {
+                                String metricName = metricsEntry.getKey();
+                                InstanceCommunication.MetricsData.DataDigest dataDigest = metricsEntry.getValue();
+
+                                String tenant = functionRuntimeInfo.getFunctionInstance()
+                                        .getFunctionMetaData().getFunctionConfig().getTenant();
+                                String namespace = functionRuntimeInfo.getFunctionInstance()
+                                        .getFunctionMetaData().getFunctionConfig().getNamespace();
+                                String name = functionRuntimeInfo.getFunctionInstance()
+                                        .getFunctionMetaData().getFunctionConfig().getName();
+                                int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                                String qualifiedNamespace = String.format("%s/%s", tenant, namespace);
+
+                                metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%scount", metricName),
+                                        instanceId, dataDigest.getCount());
+                                metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%smax", metricName),
+                                        instanceId, dataDigest.getMax());
+                                metric(out, cluster, qualifiedNamespace,name, String.format("pulsar_function%smin", metricName),
+                                        instanceId, dataDigest.getMin());
+                                metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%ssum", metricName),
+                                        instanceId, dataDigest.getSum());
+
+                            }
+                        } catch (InterruptedException | ExecutionException e) {
+                            log.warn("Failed to collect metrics for function instance {}",
+                                    fullyQualifiedInstanceName, e);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace,
+                               String functionName, String metricName, int instanceId, double value) {
+        stream.write(metricName).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+                .write("\", name=\"").write(functionName).write("\", instanceId=\"").write(instanceId).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 579572a58..fb875b5ee 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -49,7 +49,6 @@
     private String pulsarFunctionsNamespace;
     private int numFunctionPackageReplicas;
     private String downloadDirectory;
-    private MetricsConfig metricsConfig;
     private long snapshotFreqMs;
     private String stateStorageServiceUrl;
     private String functionAssignmentTopicName;
@@ -58,6 +57,7 @@
     private long rescheduleTimeoutMs;
     private int initialBrokerReconnectMaxRetries;
     private int assignmentWriteMaxRetries;
+    private long instanceLivenessCheckFreqMs;
 
     @Data
     @Setter
@@ -97,30 +97,4 @@ public static WorkerConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), WorkerConfig.class);
     }
-
-    @Data
-    @Setter
-    @Getter
-    @EqualsAndHashCode
-    @ToString
-    @AllArgsConstructor
-    @NoArgsConstructor
-    @Accessors(chain = true)
-    /**
-     * This represents the config related to the resource limits of function calls
-     */
-    public static class MetricsConfig implements Serializable {
-
-        private static final long serialVersionUID = 1L;
-
-        private String metricsSinkClassName;
-        private int metricsCollectionInterval;
-        private Map<String, String> metricsSinkConfig;
-
-        public static MetricsConfig load(String yamlFile) throws IOException {
-            ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-            return mapper.readValue(new File(yamlFile), MetricsConfig.class);
-        }
-
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 732ab07c9..20ffd8d94 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -58,21 +58,6 @@ public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
     }
 
-    public static ServletContextHandler newServletContextHandler(String contextPath, WorkerService workerService) {
-        final ResourceConfig config = new ResourceConfig(Resources.get());
-        final ServletContextHandler contextHandler =
-                new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
-
-        contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, workerService);
-        contextHandler.setContextPath(contextPath);
-
-        final ServletHolder apiServlet =
-                new ServletHolder(new ServletContainer(config));
-        contextHandler.addServlet(apiServlet, "/*");
-
-        return contextHandler;
-    }
-
     public void start() throws InterruptedException {
         try {
             start(FunctionMetadataSetup.setupFunctionMetadata(workerConfig));
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 702ab4cc0..ff43e2501 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
+import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
@@ -31,15 +32,19 @@
     private Resources() {
     }
 
-    public static Set<Class<?>> get() {
-        return new HashSet<>(getClasses());
+    public static Set<Class<?>> getApiResources() {
+        return new HashSet<>(
+                Arrays.asList(
+                        FunctionApiV2Resource.class,
+                        MultiPartFeature.class
+                ));
     }
 
-    private static List<Class<?>> getClasses() {
-        return Arrays.asList(
-                ConfigurationResource.class,
-                FunctionApiV2Resource.class,
-                MultiPartFeature.class
-        );
+    public static Set<Class<?>> getRootResources() {
+        return new HashSet<>(
+                Arrays.asList(
+                        ConfigurationResource.class,
+                        FunctionsMetricsResource.class
+                ));
     }
 }
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 0e897da18..524a6ad29 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -31,6 +31,10 @@
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.servlet.ServletContainer;
 
 @Slf4j
 public class WorkerServer implements Runnable {
@@ -57,8 +61,13 @@ public void run() {
         final Server server = new Server(this.workerConfig.getWorkerPort());
 
         List<Handler> handlers = new ArrayList<>(2);
-        handlers.add(WorkerService.newServletContextHandler("/admin", workerService));
-        handlers.add(WorkerService.newServletContextHandler("/admin/v2", workerService));
+        handlers.add(newServletContextHandler("/admin",
+                new ResourceConfig(Resources.getApiResources()), workerService));
+        handlers.add(newServletContextHandler("/admin/v2",
+                new ResourceConfig(Resources.getApiResources()), workerService));
+        handlers.add(newServletContextHandler("/",
+                new ResourceConfig(Resources.getRootResources()), workerService));
+
         ContextHandlerCollection contexts = new ContextHandlerCollection();
         contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
         HandlerCollection handlerCollection = new HandlerCollection();
@@ -86,4 +95,18 @@ public void run() {
     public String getThreadName() {
         return "worker-server-thread-" + this.workerConfig.getWorkerId();
     }
+
+    public static ServletContextHandler newServletContextHandler(String contextPath, ResourceConfig config, WorkerService workerService) {
+        final ServletContextHandler contextHandler =
+                new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+
+        contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, workerService);
+        contextHandler.setContextPath(contextPath);
+
+        final ServletHolder apiServlet =
+                new ServletHolder(new ServletContainer(config));
+        contextHandler.addServlet(apiServlet, "/*");
+
+        return contextHandler;
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
new file mode 100644
index 000000000..0b8706041
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.functions.worker.FunctionsStatsGenerator;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
+@Path("/")
+public class FunctionsMetricsResource extends FunctionApiResource {
+    @Path("metrics")
+    @GET
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response getMetrics() throws JsonProcessingException {
+
+        WorkerService workerService = get();
+
+        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+        try {
+            SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
+            FunctionsStatsGenerator.generate(workerService,"default", stream);
+            byte[] payload = buf.array();
+            int arrayOffset = buf.arrayOffset();
+            int readableBytes = buf.readableBytes();
+            StreamingOutput streamOut = out -> {
+                out.write(payload, arrayOffset, readableBytes);
+                out.flush();
+            };
+            return Response.ok(streamOut).build();
+        } finally {
+            buf.release();
+        }
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 25ecf0a93..7effcd8d8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -128,5 +128,4 @@ public Response getCluster() {
     public Response getAssignments() {
         return functions.getAssignments();
     }
-
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 327b4f6dd..3bca55c31 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -79,7 +79,6 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName()));
 
         PulsarClient pulsarClient = mock(PulsarClient.class);
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
@@ -173,7 +172,6 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception {
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName()));
 
         PulsarClient pulsarClient = mock(PulsarClient.class);
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
@@ -271,7 +269,6 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception {
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName()));
 
         PulsarClient pulsarClient = mock(PulsarClient.class);
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
new file mode 100644
index 000000000..5bdb81275
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import lombok.ToString;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+
+public class FunctionStatsGeneratorTest {
+
+    @Test
+    public void testFunctionsStatsGenerate() {
+        FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
+        Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new HashMap<>();
+
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
+
+        CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>();
+        InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()
+                .putMetrics(
+                        "__total_processed__",
+                        InstanceCommunication.MetricsData.DataDigest.newBuilder()
+                                .setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
+                .putMetrics("__avg_latency_ms__",
+                        InstanceCommunication.MetricsData.DataDigest.newBuilder()
+                                .setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
+                .build();
+
+        metricsDataCompletableFuture.complete(metricsData);
+        Runtime runtime = mock(Runtime.class);
+        doReturn(metricsDataCompletableFuture).when(runtime).getAndResetMetrics();
+
+        RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
+        doReturn(runtime).when(runtimeSpawner).getRuntime();
+
+        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionConfig(
+                Function.FunctionConfig.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+        Function.Instance instance = Function.Instance.newBuilder()
+                .setFunctionMetaData(function1).setInstanceId(0).build();
+
+        FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
+        doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
+        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+
+        functionRuntimeInfoMap.put(Utils.getFullyQualifiedInstanceId(instance), functionRuntimeInfo);
+        doReturn(functionRuntimeInfoMap).when(functionRuntimeManager).getFunctionRuntimeInfos();
+
+        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+        SimpleTextOutputStream statsOut = new SimpleTextOutputStream(buf);
+        FunctionsStatsGenerator.generate(workerService, "default", statsOut);
+
+        String str = buf.toString(Charset.defaultCharset());
+        buf.release();
+        Map<String, Metric> metrics = parseMetrics(str);
+
+        Assert.assertEquals(metrics.size(), 8);
+
+        Metric m = metrics.get("pulsar_function__total_processed__count");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 100.0);
+
+        m = metrics.get("pulsar_function__total_processed__max");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 200.0);
+
+        m = metrics.get("pulsar_function__total_processed__sum");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 300.0);
+
+        m = metrics.get("pulsar_function__total_processed__min");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 0.0);
+
+        m = metrics.get("pulsar_function__avg_latency_ms__count");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 10.0);
+
+        m = metrics.get("pulsar_function__avg_latency_ms__max");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 20.0);
+
+        m = metrics.get("pulsar_function__avg_latency_ms__sum");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 30.0);
+
+        m = metrics.get("pulsar_function__avg_latency_ms__min");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 0.0);
+    }
+
+    /**
+     * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
+     */
+    private static Map<String, Metric> parseMetrics(String metrics) {
+        Map<String, Metric> parsed = new HashMap<>();
+
+        // Example of lines are
+        // jvm_threads_current{cluster="standalone",} 203.0
+        // or
+        // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
+        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+        Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
+        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+
+        Arrays.asList(metrics.split("\n")).forEach(line -> {
+            if (line.isEmpty()) {
+                return;
+            }
+            Matcher matcher = pattern.matcher(line);
+
+            checkArgument(matcher.matches());
+            String name = matcher.group(1);
+
+            Metric m = new Metric();
+            m.value = Double.valueOf(matcher.group(3));
+
+            String tags = matcher.group(2);
+            Matcher tagsMatcher = tagsPattern.matcher(tags);
+            while (tagsMatcher.find()) {
+                String tag = tagsMatcher.group(1);
+                String value = tagsMatcher.group(2);
+                m.tags.put(tag, value);
+            }
+
+            parsed.put(name, m);
+        });
+
+        return parsed;
+    }
+
+    @ToString
+    static class Metric {
+        Map<String, String> tags = new TreeMap<>();
+        double value;
+    }
+
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 43b08a17c..4fd72b992 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -60,7 +60,6 @@ public MembershipManagerTest() {
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
     }
 
     @Test
@@ -250,7 +249,6 @@ public void testCheckFailuresSomeUnassigned() throws Exception {
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
         PulsarClient pulsarClient = mock(PulsarClient.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index a5ada20e8..b115b1767 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -69,8 +69,6 @@ public void setup() throws PulsarClientException {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig()
-                .setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
         workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
         workerConfig.setAssignmentWriteMaxRetries(0);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services