You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/15 22:18:20 UTC

[incubator-pulsar] branch master updated: Functions metrics prometheus (#1373)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fbd8c3  Functions metrics prometheus (#1373)
6fbd8c3 is described below

commit 6fbd8c3c63c66cd1bfc08d04c2c553ffb78e0b55
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Mar 15 15:18:18 2018 -0700

    Functions metrics prometheus (#1373)
    
    * adding pulsar function stats to broker prometheus
    
    * refactoring class name
    
    * adding missing license header
    
    * refactoring code and removing function's metrics sink
    
    * fixing header
    
    * adding instance liveness check
    
    * adding null check
    
    * adding unittest
---
 conf/functions_worker.yml                          |   7 +-
 .../stats/prometheus/NamespaceStatsAggregator.java |   2 +-
 .../prometheus/PrometheusMetricsGenerator.java     |   6 +-
 .../stats/prometheus/PrometheusMetricsServlet.java |   1 -
 .../pulsar/broker/stats/prometheus/TopicStats.java |   2 +-
 .../org/apache/pulsar/utils/StatsOutputStream.java |   1 +
 .../pulsar/utils/SimpleTextOutputStreamTest.java   |   3 +-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   3 +-
 .../apache/pulsar/common/util}/NumberFormat.java   |   2 +-
 .../common/util}/SimpleTextOutputStream.java       |   9 +-
 .../pulsar/functions/runtime/JavaInstanceMain.java |  21 ++-
 .../pulsar/functions/runtime/RuntimeSpawner.java   |  50 ++---
 pulsar-functions/worker/pom.xml                    |   6 +
 .../pulsar/functions/worker/FunctionActioner.java  |  11 +-
 .../functions/worker/FunctionRuntimeManager.java   |  24 +--
 .../functions/worker/FunctionsStatsGenerator.java  |  93 ++++++++++
 .../pulsar/functions/worker/WorkerConfig.java      |  28 +--
 .../pulsar/functions/worker/WorkerService.java     |  15 --
 .../pulsar/functions/worker/rest/Resources.java    |  21 ++-
 .../pulsar/functions/worker/rest/WorkerServer.java |  27 ++-
 .../worker/rest/api/FunctionsMetricsResource.java  |  61 +++++++
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |   1 -
 .../worker/FunctionRuntimeManagerTest.java         |   3 -
 .../worker/FunctionStatsGeneratorTest.java         | 201 +++++++++++++++++++++
 .../functions/worker/MembershipManagerTest.java    |   2 -
 .../functions/worker/SchedulerManagerTest.java     |   2 -
 26 files changed, 455 insertions(+), 147 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index daaeae9..ff9d231 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -28,12 +28,6 @@ pulsarServiceUrl: pulsar://localhost:6650
 pulsarWebServiceUrl: http://localhost:8080
 numFunctionPackageReplicas: 1
 downloadDirectory: /tmp/pulsar_functions
-metricsConfig:
-  metricsSinkClassName: org.apache.pulsar.functions.metrics.sink.PrometheusSink
-  metricsCollectionInterval: 30
-  metricsSinkConfig:
-    path: /metrics
-    port: 9099
 #threadContainerFactory:
 #  threadGroupName: "Thread Function Container Group"
 processContainerFactory:
@@ -45,3 +39,4 @@ failureCheckFreqMs: 30000
 rescheduleTimeoutMs: 60000
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
+instanceLivenessCheckFreqMs: 30000
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 18693da..22115ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 import io.netty.util.concurrent.FastThreadLocal;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index e7bcda1..167ec1c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -24,7 +24,7 @@ import java.util.Enumeration;
 
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -36,6 +36,7 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
 import io.prometheus.client.hotspot.DefaultExports;
+import org.apache.pulsar.functions.worker.FunctionsStatsGenerator;
 
 /**
  * Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out
@@ -71,6 +72,9 @@ public class PrometheusMetricsGenerator {
 
             NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, stream);
 
+            FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
+                    pulsar.getConfiguration().getClusterName(), stream);
+
             out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
         } finally {
             buf.release();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 0dc8749..b924177 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -60,7 +60,6 @@ public class PrometheusMetricsServlet extends HttpServlet {
     protected void doGet(HttpServletRequest request, HttpServletResponse response)
             throws ServletException, IOException {
         AsyncContext context = request.startAsync();
-
         executor.execute(safeRun(() -> {
             HttpServletResponse res = (HttpServletResponse) context.getResponse();
             try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index a750b75..f5938b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java
index 7bdc66b..90d5c3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.utils;
 import java.util.Stack;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 public class StatsOutputStream extends SimpleTextOutputStream {
     private final Stack<Boolean> separators = new Stack<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java
index 57a5b64..030ff30 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java
@@ -22,8 +22,7 @@ import static org.testng.Assert.assertEquals;
 
 import java.nio.charset.Charset;
 
-import org.apache.pulsar.utils.SimpleTextOutputStream;
-import org.apache.pulsar.utils.StatsOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index ae348ae..b38aed2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -477,8 +477,7 @@ public class CmdFunctions extends CmdBase {
                             instanceConfig,
                             userCodeFile,
                             containerFactory,
-                            null,
-                            0);
+                            null);
                     spawners.add(runtimeSpawner);
                     runtimeSpawner.start();
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
similarity index 98%
rename from pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
index 0db4adb..eaf3664 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.utils;
+package org.apache.pulsar.common.util;
 
 import io.netty.buffer.ByteBuf;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
similarity index 94%
rename from pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index 091c529..3f75fde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.utils;
+package org.apache.pulsar.common.util;
 
 import io.netty.buffer.ByteBuf;
 
@@ -99,17 +99,17 @@ public class SimpleTextOutputStream {
         return this;
     }
 
-    SimpleTextOutputStream write(boolean b) {
+    public SimpleTextOutputStream write(boolean b) {
         write(b ? "true" : "false");
         return this;
     }
 
-    SimpleTextOutputStream write(long n) {
+    public SimpleTextOutputStream write(long n) {
         NumberFormat.format(this.buffer, n);
         return this;
     }
 
-    SimpleTextOutputStream write(double d) {
+    public SimpleTextOutputStream write(double d) {
         long i = (long) d;
         write(i);
 
@@ -131,5 +131,4 @@ public class SimpleTextOutputStream {
         write(r);
         return this;
     }
-
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 08bfbe4..594ea10 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 
 import java.lang.reflect.Type;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 /**
  * A function container implemented using java thread.
@@ -166,8 +167,7 @@ public class JavaInstanceMain {
                 instanceConfig,
                 jarFile,
                 containerFactory,
-                null,
-                0);
+                null);
 
         server = ServerBuilder.forPort(port)
                 .addService(new InstanceControlImpl(runtimeSpawner))
@@ -221,5 +221,22 @@ public class JavaInstanceMain {
                 throw new RuntimeException(e);
             }
         }
+
+        @Override
+        public void getAndResetMetrics(com.google.protobuf.Empty request,
+                                       io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
+            Runtime runtime = runtimeSpawner.getRuntime();
+            if (runtime != null) {
+                try {
+                    InstanceCommunication.MetricsData metrics = runtime.getAndResetMetrics().get();
+                    responseObserver.onNext(metrics);
+                    responseObserver.onCompleted();
+                } catch (InterruptedException | ExecutionException e) {
+                    log.error("Exception in JavaInstance doing getAndResetMetrics", e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
     }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index d6c203a..6bcb988 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -31,10 +31,6 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.runtime.RuntimeFactory;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.metrics.MetricsSink;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 
 @Slf4j
 public class RuntimeSpawner implements AutoCloseable {
@@ -45,22 +41,19 @@ public class RuntimeSpawner implements AutoCloseable {
 
     @Getter
     private Runtime runtime;
-    private MetricsSink metricsSink;
-    private int metricsCollectionInterval;
-    private Timer metricsCollectionTimer;
+    private Timer processLivenessCheckTimer;
     private int numRestarts;
+    private Long instanceLivenessCheckFreqMs;
+
 
     public RuntimeSpawner(InstanceConfig instanceConfig,
                           String codeFile,
-                          RuntimeFactory containerFactory,
-                          MetricsSink metricsSink,
-                          int metricsCollectionInterval) {
+                          RuntimeFactory containerFactory, Long instanceLivenessCheckFreqMs) {
         this.instanceConfig = instanceConfig;
         this.runtimeFactory = containerFactory;
         this.codeFile = codeFile;
-        this.metricsSink = metricsSink;
-        this.metricsCollectionInterval = metricsCollectionInterval;
         this.numRestarts = 0;
+        this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
     }
 
     public void start() throws Exception {
@@ -68,34 +61,21 @@ public class RuntimeSpawner implements AutoCloseable {
                 this.instanceConfig.getInstanceId());
         runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile);
         runtime.start();
-        if (metricsSink != null) {
-            log.info("Scheduling Metrics Collection every {} secs for {} - {}",
-                    metricsCollectionInterval,
-                    FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
-                    instanceConfig.getInstanceId());
-            metricsCollectionTimer = new Timer();
-            metricsCollectionTimer.scheduleAtFixedRate(new TimerTask() {
+
+        // monitor function runtime to make sure it is running.  If not, restart the function runtime
+        if (instanceLivenessCheckFreqMs != null) {
+            processLivenessCheckTimer = new Timer();
+            processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
                 @Override
                 public void run() {
-                    if (runtime.isAlive()) {
-
-                        log.info("Collecting metrics for function {} - {}",
-                                FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
-                                instanceConfig.getInstanceId());
-                        runtime.getAndResetMetrics().thenAccept(t -> {
-                            if (t != null) {
-                                log.debug("Collected metrics {}", t);
-                                metricsSink.processRecord(t, instanceConfig.getFunctionConfig());
-                            }
-                        });
-                    } else {
+                    if (!runtime.isAlive()) {
                         log.error("Function Container is dead with exception", runtime.getDeathException());
                         log.error("Restarting...");
                         runtime.start();
                         numRestarts++;
                     }
                 }
-            }, metricsCollectionInterval * 1000, metricsCollectionInterval * 1000);
+            }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs);
         }
     }
 
@@ -122,9 +102,9 @@ public class RuntimeSpawner implements AutoCloseable {
             runtime.stop();
             runtime = null;
         }
-        if (metricsCollectionTimer != null) {
-            metricsCollectionTimer.cancel();
-            metricsCollectionTimer = null;
+        if (processLivenessCheckTimer != null) {
+            processLivenessCheckTimer.cancel();
+            processLivenessCheckTimer = null;
         }
     }
 }
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 3dab594..a4e9f03 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -119,6 +119,12 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.glassfish.jersey.media</groupId>
       <artifactId>jersey-media-json-jackson</artifactId>
     </dependency>
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 296e7d0..1eb3d28 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -32,7 +32,6 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.metrics.MetricsSink;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 
@@ -52,8 +51,6 @@ public class FunctionActioner implements AutoCloseable {
 
     private final WorkerConfig workerConfig;
     private final RuntimeFactory runtimeFactory;
-    private final MetricsSink metricsSink;
-    private final int metricsCollectionInterval;
     private final Namespace dlogNamespace;
     private LinkedBlockingQueue<FunctionAction> actionQueue;
     private volatile boolean running;
@@ -61,14 +58,10 @@ public class FunctionActioner implements AutoCloseable {
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
-                            MetricsSink metricsSink,
-                            int metricCollectionInterval,
                             Namespace dlogNamespace,
                             LinkedBlockingQueue<FunctionAction> actionQueue) {
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
-        this.metricsSink = metricsSink;
-        this.metricsCollectionInterval = metricCollectionInterval;
         this.dlogNamespace = dlogNamespace;
         this.actionQueue = actionQueue;
         actioner = new Thread(() -> {
@@ -168,8 +161,8 @@ public class FunctionActioner implements AutoCloseable {
         instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
         instanceConfig.setInstanceId(String.valueOf(instanceId));
         instanceConfig.setMaxBufferedTuples(1024);
-        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), runtimeFactory,
-                metricsSink, metricsCollectionInterval);
+        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
+                runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
 
         functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
         runtimeSpawner.start();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 1d65078..a20c782 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
-import org.apache.pulsar.functions.metrics.MetricsSink;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 
 import javax.ws.rs.client.Client;
@@ -73,8 +72,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     private final FunctionAssignmentTailer functionAssignmentTailer;
 
-    private MetricsSink metricsSink;
-
     private FunctionActioner functionActioner;
 
     private RuntimeFactory runtimeFactory;
@@ -110,12 +107,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
             throw new RuntimeException("Either Thread or Process Container Factory need to be set");
         }
 
-        this.metricsSink = createMetricsSink();
-
         this.actionQueue = new LinkedBlockingQueue<>();
 
         this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
-                this.metricsSink, this.workerConfig.getMetricsConfig().getMetricsCollectionInterval(),
                 dlogNamespace, actionQueue);
 
         this.membershipManager = membershipManager;
@@ -127,7 +121,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
     public void start() {
         log.info("/** Starting Function Runtime Manager **/");
         log.info("Initialize metrics sink...");
-        this.metricsSink.init(this.workerConfig.getMetricsConfig().getMetricsSinkConfig());
         log.info("Starting function actioner...");
         this.functionActioner.start();
         log.info("Starting function assignment tailer...");
@@ -428,6 +421,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
         }
     }
 
+    public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
+        return this.functionRuntimeInfoMap;
+    }
     /**
      * Private methods for internal use.  Should not be used outside of this class
      */
@@ -517,20 +513,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
         this.functionAssignmentTailer.close();
     }
 
-    private MetricsSink createMetricsSink() {
-        String className = workerConfig.getMetricsConfig().getMetricsSinkClassName();
-        try {
-            MetricsSink sink = (MetricsSink) Class.forName(className).newInstance();
-            return sink;
-        } catch (InstantiationException e) {
-            throw new RuntimeException(e + " IMetricsSink class must have a no-arg constructor.");
-        } catch (IllegalAccessException e) {
-            throw new RuntimeException(e + " IMetricsSink class must be concrete.");
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e + " IMetricsSink class must be a class path.");
-        }
-    }
-
     private Map<String, Assignment> diff(Map<String, Assignment> assignmentMap1, Map<String, Assignment> assignmentMap2) {
         Map<String, Assignment> result = new HashMap<>();
         for (Map.Entry<String, Assignment> entry : assignmentMap1.entrySet()) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
new file mode 100644
index 0000000..c9fd539
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A class to generate stats for pulsar functions running on this broker
+ */
+public class FunctionsStatsGenerator {
+
+    private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class);
+
+    public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
+        if (workerService != null) {
+            Map<String, FunctionRuntimeInfo> functionRuntimes
+                    = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
+
+            for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
+                String fullyQualifiedInstanceName = entry.getKey();
+                FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+                RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
+
+                if (functionRuntimeSpawner != null) {
+                    Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+                    if (functionRuntime != null) {
+                        try {
+                            InstanceCommunication.MetricsData metrics = functionRuntime.getAndResetMetrics().get();
+                            for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest> metricsEntry
+                                    : metrics.getMetricsMap().entrySet()) {
+                                String metricName = metricsEntry.getKey();
+                                InstanceCommunication.MetricsData.DataDigest dataDigest = metricsEntry.getValue();
+
+                                String tenant = functionRuntimeInfo.getFunctionInstance()
+                                        .getFunctionMetaData().getFunctionConfig().getTenant();
+                                String namespace = functionRuntimeInfo.getFunctionInstance()
+                                        .getFunctionMetaData().getFunctionConfig().getNamespace();
+                                String name = functionRuntimeInfo.getFunctionInstance()
+                                        .getFunctionMetaData().getFunctionConfig().getName();
+                                int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                                String qualifiedNamespace = String.format("%s/%s", tenant, namespace);
+
+                                metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%scount", metricName),
+                                        instanceId, dataDigest.getCount());
+                                metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%smax", metricName),
+                                        instanceId, dataDigest.getMax());
+                                metric(out, cluster, qualifiedNamespace,name, String.format("pulsar_function%smin", metricName),
+                                        instanceId, dataDigest.getMin());
+                                metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%ssum", metricName),
+                                        instanceId, dataDigest.getSum());
+
+                            }
+                        } catch (InterruptedException | ExecutionException e) {
+                            log.warn("Failed to collect metrics for function instance {}",
+                                    fullyQualifiedInstanceName, e);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace,
+                               String functionName, String metricName, int instanceId, double value) {
+        stream.write(metricName).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+                .write("\", name=\"").write(functionName).write("\", instanceId=\"").write(instanceId).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 579572a..fb875b5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -49,7 +49,6 @@ public class WorkerConfig implements Serializable {
     private String pulsarFunctionsNamespace;
     private int numFunctionPackageReplicas;
     private String downloadDirectory;
-    private MetricsConfig metricsConfig;
     private long snapshotFreqMs;
     private String stateStorageServiceUrl;
     private String functionAssignmentTopicName;
@@ -58,6 +57,7 @@ public class WorkerConfig implements Serializable {
     private long rescheduleTimeoutMs;
     private int initialBrokerReconnectMaxRetries;
     private int assignmentWriteMaxRetries;
+    private long instanceLivenessCheckFreqMs;
 
     @Data
     @Setter
@@ -97,30 +97,4 @@ public class WorkerConfig implements Serializable {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), WorkerConfig.class);
     }
-
-    @Data
-    @Setter
-    @Getter
-    @EqualsAndHashCode
-    @ToString
-    @AllArgsConstructor
-    @NoArgsConstructor
-    @Accessors(chain = true)
-    /**
-     * This represents the config related to the resource limits of function calls
-     */
-    public static class MetricsConfig implements Serializable {
-
-        private static final long serialVersionUID = 1L;
-
-        private String metricsSinkClassName;
-        private int metricsCollectionInterval;
-        private Map<String, String> metricsSinkConfig;
-
-        public static MetricsConfig load(String yamlFile) throws IOException {
-            ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-            return mapper.readValue(new File(yamlFile), MetricsConfig.class);
-        }
-
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 732ab07..20ffd8d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -58,21 +58,6 @@ public class WorkerService {
         this.workerConfig = workerConfig;
     }
 
-    public static ServletContextHandler newServletContextHandler(String contextPath, WorkerService workerService) {
-        final ResourceConfig config = new ResourceConfig(Resources.get());
-        final ServletContextHandler contextHandler =
-                new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
-
-        contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, workerService);
-        contextHandler.setContextPath(contextPath);
-
-        final ServletHolder apiServlet =
-                new ServletHolder(new ServletContainer(config));
-        contextHandler.addServlet(apiServlet, "/*");
-
-        return contextHandler;
-    }
-
     public void start() throws InterruptedException {
         try {
             start(FunctionMetadataSetup.setupFunctionMetadata(workerConfig));
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 702ab4c..ff43e25 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
+import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
@@ -31,15 +32,19 @@ public final class Resources {
     private Resources() {
     }
 
-    public static Set<Class<?>> get() {
-        return new HashSet<>(getClasses());
+    public static Set<Class<?>> getApiResources() {
+        return new HashSet<>(
+                Arrays.asList(
+                        FunctionApiV2Resource.class,
+                        MultiPartFeature.class
+                ));
     }
 
-    private static List<Class<?>> getClasses() {
-        return Arrays.asList(
-                ConfigurationResource.class,
-                FunctionApiV2Resource.class,
-                MultiPartFeature.class
-        );
+    public static Set<Class<?>> getRootResources() {
+        return new HashSet<>(
+                Arrays.asList(
+                        ConfigurationResource.class,
+                        FunctionsMetricsResource.class
+                ));
     }
 }
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 0e897da..524a6ad 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -31,6 +31,10 @@ import java.net.URI;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.servlet.ServletContainer;
 
 @Slf4j
 public class WorkerServer implements Runnable {
@@ -57,8 +61,13 @@ public class WorkerServer implements Runnable {
         final Server server = new Server(this.workerConfig.getWorkerPort());
 
         List<Handler> handlers = new ArrayList<>(2);
-        handlers.add(WorkerService.newServletContextHandler("/admin", workerService));
-        handlers.add(WorkerService.newServletContextHandler("/admin/v2", workerService));
+        handlers.add(newServletContextHandler("/admin",
+                new ResourceConfig(Resources.getApiResources()), workerService));
+        handlers.add(newServletContextHandler("/admin/v2",
+                new ResourceConfig(Resources.getApiResources()), workerService));
+        handlers.add(newServletContextHandler("/",
+                new ResourceConfig(Resources.getRootResources()), workerService));
+
         ContextHandlerCollection contexts = new ContextHandlerCollection();
         contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
         HandlerCollection handlerCollection = new HandlerCollection();
@@ -86,4 +95,18 @@ public class WorkerServer implements Runnable {
     public String getThreadName() {
         return "worker-server-thread-" + this.workerConfig.getWorkerId();
     }
+
+    public static ServletContextHandler newServletContextHandler(String contextPath, ResourceConfig config, WorkerService workerService) {
+        final ServletContextHandler contextHandler =
+                new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+
+        contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, workerService);
+        contextHandler.setContextPath(contextPath);
+
+        final ServletHolder apiServlet =
+                new ServletHolder(new ServletContainer(config));
+        contextHandler.addServlet(apiServlet, "/*");
+
+        return contextHandler;
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
new file mode 100644
index 0000000..0b87060
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.functions.worker.FunctionsStatsGenerator;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
+@Path("/")
+public class FunctionsMetricsResource extends FunctionApiResource {
+    @Path("metrics")
+    @GET
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response getMetrics() throws JsonProcessingException {
+
+        WorkerService workerService = get();
+
+        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+        try {
+            SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
+            FunctionsStatsGenerator.generate(workerService,"default", stream);
+            byte[] payload = buf.array();
+            int arrayOffset = buf.arrayOffset();
+            int readableBytes = buf.readableBytes();
+            StreamingOutput streamOut = out -> {
+                out.write(payload, arrayOffset, readableBytes);
+                out.flush();
+            };
+            return Response.ok(streamOut).build();
+        } finally {
+            buf.release();
+        }
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 25ecf0a..7effcd8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -128,5 +128,4 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response getAssignments() {
         return functions.getAssignments();
     }
-
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 327b4f6..3bca55c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -79,7 +79,6 @@ public class FunctionRuntimeManagerTest {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName()));
 
         PulsarClient pulsarClient = mock(PulsarClient.class);
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
@@ -173,7 +172,6 @@ public class FunctionRuntimeManagerTest {
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName()));
 
         PulsarClient pulsarClient = mock(PulsarClient.class);
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
@@ -271,7 +269,6 @@ public class FunctionRuntimeManagerTest {
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName()));
 
         PulsarClient pulsarClient = mock(PulsarClient.class);
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
new file mode 100644
index 0000000..5bdb812
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import lombok.ToString;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+
+public class FunctionStatsGeneratorTest {
+
+    @Test
+    public void testFunctionsStatsGenerate() {
+        FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
+        Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new HashMap<>();
+
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
+
+        CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>();
+        InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()
+                .putMetrics(
+                        "__total_processed__",
+                        InstanceCommunication.MetricsData.DataDigest.newBuilder()
+                                .setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
+                .putMetrics("__avg_latency_ms__",
+                        InstanceCommunication.MetricsData.DataDigest.newBuilder()
+                                .setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
+                .build();
+
+        metricsDataCompletableFuture.complete(metricsData);
+        Runtime runtime = mock(Runtime.class);
+        doReturn(metricsDataCompletableFuture).when(runtime).getAndResetMetrics();
+
+        RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
+        doReturn(runtime).when(runtimeSpawner).getRuntime();
+
+        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionConfig(
+                Function.FunctionConfig.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+        Function.Instance instance = Function.Instance.newBuilder()
+                .setFunctionMetaData(function1).setInstanceId(0).build();
+
+        FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
+        doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
+        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+
+        functionRuntimeInfoMap.put(Utils.getFullyQualifiedInstanceId(instance), functionRuntimeInfo);
+        doReturn(functionRuntimeInfoMap).when(functionRuntimeManager).getFunctionRuntimeInfos();
+
+        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+        SimpleTextOutputStream statsOut = new SimpleTextOutputStream(buf);
+        FunctionsStatsGenerator.generate(workerService, "default", statsOut);
+
+        String str = buf.toString(Charset.defaultCharset());
+        buf.release();
+        Map<String, Metric> metrics = parseMetrics(str);
+
+        Assert.assertEquals(metrics.size(), 8);
+
+        Metric m = metrics.get("pulsar_function__total_processed__count");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 100.0);
+
+        m = metrics.get("pulsar_function__total_processed__max");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 200.0);
+
+        m = metrics.get("pulsar_function__total_processed__sum");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 300.0);
+
+        m = metrics.get("pulsar_function__total_processed__min");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 0.0);
+
+        m = metrics.get("pulsar_function__avg_latency_ms__count");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 10.0);
+
+        m = metrics.get("pulsar_function__avg_latency_ms__max");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 20.0);
+
+        m = metrics.get("pulsar_function__avg_latency_ms__sum");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 30.0);
+
+        m = metrics.get("pulsar_function__avg_latency_ms__min");
+        assertEquals(m.tags.get("cluster"), "default");
+        assertEquals(m.tags.get("instanceId"), "0");
+        assertEquals(m.tags.get("name"), "func-1");
+        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+        assertEquals(m.value, 0.0);
+    }
+
+    /**
+     * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
+     */
+    private static Map<String, Metric> parseMetrics(String metrics) {
+        Map<String, Metric> parsed = new HashMap<>();
+
+        // Example of lines are
+        // jvm_threads_current{cluster="standalone",} 203.0
+        // or
+        // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
+        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+        Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
+        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+
+        Arrays.asList(metrics.split("\n")).forEach(line -> {
+            if (line.isEmpty()) {
+                return;
+            }
+            Matcher matcher = pattern.matcher(line);
+
+            checkArgument(matcher.matches());
+            String name = matcher.group(1);
+
+            Metric m = new Metric();
+            m.value = Double.valueOf(matcher.group(3));
+
+            String tags = matcher.group(2);
+            Matcher tagsMatcher = tagsPattern.matcher(tags);
+            while (tagsMatcher.find()) {
+                String tag = tagsMatcher.group(1);
+                String value = tagsMatcher.group(2);
+                m.tags.put(tag, value);
+            }
+
+            parsed.put(name, m);
+        });
+
+        return parsed;
+    }
+
+    @ToString
+    static class Metric {
+        Map<String, String> tags = new TreeMap<>();
+        double value;
+    }
+
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 43b08a1..4fd72b9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -60,7 +60,6 @@ public class MembershipManagerTest {
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
     }
 
     @Test
@@ -250,7 +249,6 @@ public class MembershipManagerTest {
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
         PulsarClient pulsarClient = mock(PulsarClient.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index a5ada20..b115b17 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -69,8 +69,6 @@ public class SchedulerManagerTest {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
-        workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig()
-                .setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
         workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
         workerConfig.setAssignmentWriteMaxRetries(0);
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.