You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/03 19:46:49 UTC

[pulsar] branch master updated: report function exceptions via prometheus (#3107)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cd900c  report function exceptions via prometheus (#3107)
8cd900c is described below

commit 8cd900c83506a7cf41588f03c3aa00e3edd61294
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Dec 3 11:46:45 2018 -0800

    report function exceptions via prometheus (#3107)
    
    ### Motivation
    
    Allow exceptions in functions to be reported via prometheus
    
    example output:
    ```
    # TYPE pulsar_function_user_exception gauge
    pulsar_function_user_exception{cluster="standalone",error="error val: 0",fqfn="public/default/py-test",function="py-test",instance_id="0",namespace="public/default",tenant="public",ts="1543786615954"} 1.0
    pulsar_function_user_exception{tenant="public",namespace="public/default",function="test",instance_id="0",cluster="standalone",fqfn="public/default/test",error="error val: 5c53460e-03cf-4368-88d4-f23aeb3adf84",ts="1543787031371",} 1.0
    ```
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 66 ++++++++++++++--------
 pulsar-client-cpp/python/setup.py                  |  5 +-
 .../functions/instance/FunctionStatsManager.java   | 56 +++++++++++++++++-
 .../pulsar/functions/instance/InstanceCache.java   |  2 +-
 .../functions/instance/JavaInstanceRunnable.java   |  7 +--
 .../instance/src/main/python/function_stats.py     | 54 ++++++++++++++----
 .../instance/src/main/python/python_instance.py    |  9 +--
 7 files changed, 154 insertions(+), 45 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 90766d3..1980bce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -407,68 +407,79 @@ public class PulsarFunctionE2ETest {
         Metric m = metrics.get("pulsar_function_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_user_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_user_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_process_latency_ms");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, Double.NaN);
         m = metrics.get("pulsar_function_process_latency_ms_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, Double.NaN);
         m = metrics.get("pulsar_function_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_processed_successfully_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_processed_successfully_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
 
 
@@ -554,68 +565,79 @@ public class PulsarFunctionE2ETest {
         m = metrics.get("pulsar_function_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_function_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_function_user_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_user_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_process_latency_ms");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_function_process_latency_ms_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_function_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_function_processed_successfully_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_function_processed_successfully_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
     }
 
diff --git a/pulsar-client-cpp/python/setup.py b/pulsar-client-cpp/python/setup.py
index 952c57b..ce8d259 100644
--- a/pulsar-client-cpp/python/setup.py
+++ b/pulsar-client-cpp/python/setup.py
@@ -70,6 +70,9 @@ setup(
     license="Apache License v2.0",
     url="http://pulsar.apache.org/",
     install_requires=[
-        'grpcio', 'protobuf', "prometheus_client"
+        'grpcio', 'protobuf',
+        # functions dependencies
+        "prometheus_client",
+        "ratelimit"
     ],
 )
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index c1b7574..f9a3c77 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -27,10 +27,12 @@ import io.prometheus.client.exporter.common.TextFormat;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.RateLimiter;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Arrays;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -43,7 +45,13 @@ import java.util.concurrent.TimeUnit;
 @Setter
 public class FunctionStatsManager implements AutoCloseable {
 
-    static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster"};
+    static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster", "fqfn"};
+    static final String[] exceptionMetricsLabelNames;
+    static {
+        exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 2);
+        exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
+        exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
+    }
 
     public static final String PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_";
     public final static String USER_METRIC_PREFIX = "user_metric_";
@@ -88,6 +96,12 @@ public class FunctionStatsManager implements AutoCloseable {
 
     final Counter statTotalRecordsRecieved1min;
 
+    // exceptions
+
+    final Gauge userExceptions;
+
+    final Gauge sysExceptions;
+
     private String[] metricsLabels;
 
     private ScheduledFuture<?> scheduledFuture;
@@ -99,6 +113,10 @@ public class FunctionStatsManager implements AutoCloseable {
     @Getter
     private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10);
 
+    private final RateLimiter userExceptionRateLimiter;
+
+    private final RateLimiter sysExceptionRateLimiter;
+
     public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) {
 
         this.collectorRegistry = collectorRegistry;
@@ -179,6 +197,18 @@ public class FunctionStatsManager implements AutoCloseable {
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
+        userExceptions = Gauge.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from user code.")
+                .register(collectorRegistry);
+
+        sysExceptions = Gauge.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from system code.")
+                .register(collectorRegistry);
+
         scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
@@ -189,21 +219,41 @@ public class FunctionStatsManager implements AutoCloseable {
                 }
             }
         }, 1, 1, TimeUnit.MINUTES);
+
+        userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
     }
 
     public void addUserException(Exception ex) {
+        long ts = System.currentTimeMillis();
         InstanceCommunication.FunctionStatus.ExceptionInformation info =
                     InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                    .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
+                    .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
         latestUserExceptions.add(info);
+
+        // report exception throw prometheus
+        if (userExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            userExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
     }
 
     public void addSystemException(Throwable ex) {
+        long ts = System.currentTimeMillis();
         InstanceCommunication.FunctionStatus.ExceptionInformation info =
                 InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                        .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
+                        .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
         latestSystemExceptions.add(info);
 
+        // report exception throw prometheus
+        if (sysExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            sysExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
     }
 
     public void incrTotalReceived() {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index 7a6b2ca..937c273 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -28,7 +28,7 @@ public class InstanceCache {
     public final ScheduledExecutorService executor;
 
     private InstanceCache() {
-        executor = Executors.newSingleThreadScheduledExecutor();;
+        executor = Executors.newSingleThreadScheduledExecutor();
     }
 
     public static InstanceCache getInstanceCache() {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index a9e9177..d302e6a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -148,11 +148,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 instanceConfig.getFunctionDetails().getTenant(),
                 String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
                         instanceConfig.getFunctionDetails().getNamespace()),
-                String.format("%s/%s/%s", instanceConfig.getFunctionDetails().getTenant(),
-                        instanceConfig.getFunctionDetails().getNamespace(),
-                        instanceConfig.getFunctionDetails().getName()),
+                instanceConfig.getFunctionDetails().getName(),
                 String.valueOf(instanceConfig.getInstanceId()),
-                instanceConfig.getClusterName()
+                instanceConfig.getClusterName(),
+                FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())
         };
 
         // Declare function local collector registry so that it will not clash with other function instances'
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index 3d6e216..3ea0316 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -20,12 +20,16 @@
 import traceback
 import time
 import util
+import sys
 
 from prometheus_client import Counter, Summary, Gauge
+from ratelimit import limits, RateLimitException
 
 # We keep track of the following metrics
 class Stats(object):
-  metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 'cluster']
+  metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 'cluster', 'fqfn']
+
+  exception_metrics_label_names = metrics_label_names + ['error', 'ts']
 
   PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
   USER_METRIC_PREFIX = "user_metric_";
@@ -57,7 +61,6 @@ class Stats(object):
 
   stat_total_received = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_RECEIVED, 'Total number of messages received from source.', metrics_label_names)
 
-
   # 1min windowed metrics
   stat_total_processed_successfully_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED_1min,
                                               'Total number of messages processed successfully in the last 1 minute.', metrics_label_names)
@@ -74,6 +77,11 @@ class Stats(object):
   stat_total_received_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_RECEIVED_1min,
                                 'Total number of messages received from source in the last 1 minute.', metrics_label_names)
 
+  # exceptions
+  user_exceptions = Gauge(PULSAR_FUNCTION_METRICS_PREFIX + 'user_exception', 'Exception from user code.', exception_metrics_label_names)
+
+  system_exceptions = Gauge(PULSAR_FUNCTION_METRICS_PREFIX + 'system_exception', 'Exception from system code.', exception_metrics_label_names)
+
   latest_user_exception = []
   latest_sys_exception = []
 
@@ -129,15 +137,15 @@ class Stats(object):
     self.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
     self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc()
 
-  def incr_total_sys_exceptions(self):
+  def incr_total_sys_exceptions(self, exception):
     self.stat_total_sys_exceptions.labels(*self.metrics_labels).inc()
     self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels).inc()
-    self.add_sys_exception()
+    self.add_sys_exception(exception)
 
-  def incr_total_user_exceptions(self):
+  def incr_total_user_exceptions(self, exception):
     self.stat_total_user_exceptions.labels(*self.metrics_labels).inc()
     self.stat_total_user_exceptions_1min.labels(*self.metrics_labels).inc()
-    self.add_user_exception()
+    self.add_user_exception(exception)
 
   def incr_total_received(self):
     self.stat_total_received.labels(*self.metrics_labels).inc()
@@ -155,16 +163,42 @@ class Stats(object):
   def set_last_invocation(self, time):
     self.stat_last_invocation.labels(*self.metrics_labels).set(time * 1000.0)
 
-  def add_user_exception(self):
-    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() * 1000)))
+  def add_user_exception(self, exception):
+    error = traceback.format_exc()
+    ts = int(time.time() * 1000) if sys.version_info.major >= 3 else long(time.time() * 1000)
+    self.latest_sys_exception.append((error, ts))
     if len(self.latest_sys_exception) > 10:
       self.latest_sys_exception.pop(0)
 
-  def add_sys_exception(self):
-    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() * 1000)))
+    # report exception via prometheus
+    try:
+      self.report_user_exception_prometheus(exception, ts)
+    except RateLimitException:
+      pass
+
+  @limits(calls=5, period=60)
+  def report_user_exception_prometheus(self, exception, ts):
+    exception_metric_labels = self.metrics_labels + [exception.message, str(ts)]
+    self.user_exceptions.labels(*exception_metric_labels).set(1.0)
+
+  def add_sys_exception(self, exception):
+    error = traceback.format_exc()
+    ts = int(time.time() * 1000) if sys.version_info.major >= 3 else long(time.time() * 1000)
+    self.latest_sys_exception.append((error, ts))
     if len(self.latest_sys_exception) > 10:
       self.latest_sys_exception.pop(0)
 
+    # report exception via prometheus
+    try:
+      self.report_system_exception_prometheus(exception, ts)
+    except RateLimitException:
+      pass
+
+  @limits(calls=5, period=60)
+  def report_system_exception_prometheus(self, exception, ts):
+    exception_metric_labels = self.metrics_labels + [exception.message, str(ts)]
+    self.system_exceptions.labels(*exception_metric_labels).set(1.0)
+
   def reset(self):
     self.latest_user_exception = []
     self.latest_sys_exception = []
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 0fc3601..01593fa 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -94,8 +94,9 @@ class PythonInstance(object):
     self.secrets_provider = secrets_provider
     self.metrics_labels = [function_details.tenant,
                            "%s/%s" % (function_details.tenant, function_details.namespace),
-                           "%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name),
-                           instance_id, cluster_name]
+                           function_details.name,
+                           instance_id, cluster_name,
+                           "%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name)]
     self.stats = Stats(self.metrics_labels)
 
   def health_check(self):
@@ -213,7 +214,7 @@ class PythonInstance(object):
           self.stats.process_time_end()
         except Exception as e:
           Log.exception("Exception while executing user method")
-          self.stats.incr_total_user_exceptions()
+          self.stats.incr_total_user_exceptions(e)
 
         if self.log_topic_handler is not None:
           log.remove_all_handlers()
@@ -224,7 +225,7 @@ class PythonInstance(object):
 
       except Exception as e:
         Log.error("Uncaught exception in Python instance: %s" % e);
-        self.stats.incr_total_sys_exceptions()
+        self.stats.incr_total_sys_exceptions(e)
 
   def done_producing(self, consumer, orig_message, result, sent_message):
     if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once: