You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/19 02:44:05 UTC

[pulsar] branch master updated: optimize java functions stats code (#3209)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 83cf25c  optimize java functions stats code (#3209)
83cf25c is described below

commit 83cf25c9699be808de258e308d0c31efc53deed8
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Dec 18 18:43:59 2018 -0800

    optimize java functions stats code (#3209)
    
    ### Motivation
    
    Optimized the stats collection code for java functions to reduce stats collection foot print both memory and cpu.  Reduced the number of objects allocated by 20% in java functions
---
 .../functions/instance/FunctionStatsManager.java   | 127 ++++++++++++++-------
 .../functions/instance/JavaInstanceRunnable.java   |   4 +-
 2 files changed, 89 insertions(+), 42 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 96941f5..72b792e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -118,6 +118,23 @@ public class FunctionStatsManager implements AutoCloseable {
 
     final Gauge sinkExceptions;
 
+    // As an optimization
+    private final Counter.Child _statTotalProcessedSuccessfully;
+    private final Counter.Child _statTotalSysExceptions;
+    private final Counter.Child _statTotalUserExceptions;
+    private final Counter.Child _statTotalSourceExceptions;
+    private final Counter.Child _statTotalSinkExceptions;
+    private final Summary.Child _statProcessLatency;
+    private final Gauge.Child _statlastInvocation;
+    private final Counter.Child _statTotalRecordsRecieved;
+    private Counter.Child _statTotalProcessedSuccessfully1min;
+    private Counter.Child _statTotalSysExceptions1min;
+    private Counter.Child _statTotalUserExceptions1min;
+    private Counter.Child _statTotalSourceExceptions1min;
+    private Counter.Child _statTotalSinkExceptions1min;
+    private Summary.Child _statProcessLatency1min;
+    private Counter.Child _statTotalRecordsRecieved1min;
+
     private String[] metricsLabels;
 
     private ScheduledFuture<?> scheduledFuture;
@@ -152,30 +169,35 @@ public class FunctionStatsManager implements AutoCloseable {
                 .help("Total number of messages processed successfully.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.labels(metricsLabels);
 
         statTotalSysExceptions = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
                 .help("Total number of system exceptions.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
 
         statTotalUserExceptions = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
                 .help("Total number of user exceptions.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalUserExceptions = statTotalUserExceptions.labels(metricsLabels);
 
         statTotalSourceExceptions = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
                 .help("Total number of source exceptions.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalSourceExceptions = statTotalSourceExceptions.labels(metricsLabels);
 
         statTotalSinkExceptions = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
                 .help("Total number of sink exceptions.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalSinkExceptions = statTotalSinkExceptions.labels(metricsLabels);
 
         statProcessLatency = Summary.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
@@ -186,48 +208,56 @@ public class FunctionStatsManager implements AutoCloseable {
                 .quantile(0.999, 0.01)
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statProcessLatency = statProcessLatency.labels(metricsLabels);
 
         statlastInvocation = Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
                 .help("The timestamp of the last invocation of the function.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statlastInvocation = statlastInvocation.labels(metricsLabels);
 
         statTotalRecordsRecieved = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
                 .help("Total number of messages received from source.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalRecordsRecieved = statTotalRecordsRecieved.labels(metricsLabels);
 
         statTotalProcessedSuccessfully1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
                 .help("Total number of messages processed successfully in the last 1 minute.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels);
 
         statTotalSysExceptions1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of system exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
 
         statTotalUserExceptions1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of user exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels);
 
         statTotalSourceExceptions1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of source exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels);
 
         statTotalSinkExceptions1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min)
                 .help("Total number of sink exceptions in the last 1 minute.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels);
 
         statProcessLatency1min = Summary.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
@@ -238,19 +268,20 @@ public class FunctionStatsManager implements AutoCloseable {
                 .quantile(0.999, 0.01)
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
 
         statTotalRecordsRecieved1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
                 .help("Total number of messages received from source in the last 1 minute.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+        _statTotalRecordsRecieved1min = statTotalRecordsRecieved1min.labels(metricsLabels);
 
         userExceptions = Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
                 .labelNames(exceptionMetricsLabelNames)
                 .help("Exception from user code.")
                 .register(collectorRegistry);
-
         sysExceptions = Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
                 .labelNames(exceptionMetricsLabelNames)
@@ -352,41 +383,41 @@ public class FunctionStatsManager implements AutoCloseable {
     }
 
     public void incrTotalReceived() {
-        statTotalRecordsRecieved.labels(metricsLabels).inc();
-        statTotalRecordsRecieved1min.labels(metricsLabels).inc();
+        _statTotalRecordsRecieved.inc();
+        _statTotalRecordsRecieved1min.inc();
     }
 
     public void incrTotalProcessedSuccessfully() {
-        statTotalProcessedSuccessfully.labels(metricsLabels).inc();
-        statTotalProcessedSuccessfully1min.labels(metricsLabels).inc();
+        _statTotalProcessedSuccessfully.inc();
+        _statTotalProcessedSuccessfully1min.inc();
     }
 
     public void incrSysExceptions(Throwable sysException) {
-        statTotalSysExceptions.labels(metricsLabels).inc();
-        statTotalSysExceptions1min.labels(metricsLabels).inc();
+        _statTotalSysExceptions.inc();
+        _statTotalSysExceptions1min.inc();
         addSystemException(sysException);
     }
 
     public void incrUserExceptions(Exception userException) {
-        statTotalUserExceptions.labels(metricsLabels).inc();
-        statTotalUserExceptions1min.labels(metricsLabels).inc();
+        _statTotalUserExceptions.inc();
+        _statTotalUserExceptions1min.inc();
         addUserException(userException);
     }
 
     public void incrSourceExceptions(Exception userException) {
-        statTotalSourceExceptions.labels(metricsLabels).inc();
-        statTotalSourceExceptions1min.labels(metricsLabels).inc();
+        _statTotalSourceExceptions.inc();
+        _statTotalSourceExceptions1min.inc();
         addSourceException(userException);
     }
 
     public void incrSinkExceptions(Exception userException) {
-        statTotalSinkExceptions.labels(metricsLabels).inc();
-        statTotalSinkExceptions1min.labels(metricsLabels).inc();
+        _statTotalSinkExceptions.inc();
+        _statTotalSinkExceptions1min.inc();
         addSinkException(userException);
     }
 
     public void setLastInvocation(long ts) {
-        statlastInvocation.labels(metricsLabels).set(ts);
+        _statlastInvocation.set(ts);
     }
 
     private Long processTimeStart;
@@ -397,113 +428,127 @@ public class FunctionStatsManager implements AutoCloseable {
     public void processTimeEnd() {
         if (processTimeStart != null) {
             double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D;
-            statProcessLatency.labels(metricsLabels).observe(endTimeMs);
-            statProcessLatency1min.labels(metricsLabels).observe(endTimeMs);
+            _statProcessLatency.observe(endTimeMs);
+            _statProcessLatency1min.observe(endTimeMs);
         }
     }
 
     public double getTotalProcessedSuccessfully() {
-        return statTotalProcessedSuccessfully.labels(metricsLabels).get();
+        return _statTotalProcessedSuccessfully.get();
     }
 
     public double getTotalRecordsReceived() {
-        return statTotalRecordsRecieved.labels(metricsLabels).get();
+        return _statTotalRecordsRecieved.get();
     }
 
     public double getTotalSysExceptions() {
-        return statTotalSysExceptions.labels(metricsLabels).get();
+        return _statTotalSysExceptions.get();
     }
 
     public double getTotalUserExceptions() {
-        return statTotalUserExceptions.labels(metricsLabels).get();
+        return _statTotalUserExceptions.get();
     }
 
     public double getTotalSourceExceptions() {
-        return statTotalSourceExceptions.labels(metricsLabels).get();
+        return _statTotalSourceExceptions.get();
     }
 
     public double getTotalSinkExceptions() {
-        return statTotalSinkExceptions.labels(metricsLabels).get();
+        return _statTotalSinkExceptions.get();
     }
 
     public double getLastInvocation() {
-        return statlastInvocation.labels(metricsLabels).get();
+        return _statlastInvocation.get();
     }
 
     public double getAvgProcessLatency() {
-        return statProcessLatency.labels(metricsLabels).get().count <= 0.0
-                ? 0 : statProcessLatency.labels(metricsLabels).get().sum / statProcessLatency.labels(metricsLabels).get().count;
+        return _statProcessLatency.get().count <= 0.0
+                ? 0 : _statProcessLatency.get().sum / _statProcessLatency.get().count;
     }
 
     public double getProcessLatency50P() {
-        return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.5);
+        return _statProcessLatency.get().quantiles.get(0.5);
     }
 
     public double getProcessLatency90P() {
-        return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.9);
+        return _statProcessLatency.get().quantiles.get(0.9);
     }
 
     public double getProcessLatency99P() {
-        return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.99);
+        return _statProcessLatency.get().quantiles.get(0.99);
     }
 
     public double getProcessLatency99_9P() {
-        return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.999);
+        return _statProcessLatency.get().quantiles.get(0.999);
     }
 
     public double getTotalProcessedSuccessfully1min() {
-        return statTotalProcessedSuccessfully1min.labels(metricsLabels).get();
+        return _statTotalProcessedSuccessfully1min.get();
     }
 
     public double getTotalRecordsReceived1min() {
-        return statTotalRecordsRecieved1min.labels(metricsLabels).get();
+        return _statTotalRecordsRecieved1min.get();
     }
 
     public double getTotalSysExceptions1min() {
-        return statTotalSysExceptions1min.labels(metricsLabels).get();
+        return _statTotalSysExceptions1min.get();
     }
 
     public double getTotalUserExceptions1min() {
-        return statTotalUserExceptions1min.labels(metricsLabels).get();
+        return _statTotalUserExceptions1min.get();
     }
 
     public double getTotalSourceExceptions1min() {
-        return statTotalSourceExceptions1min.labels(metricsLabels).get();
+        return _statTotalSourceExceptions1min.get();
     }
 
     public double getTotalSinkExceptions1min() {
-        return statTotalSinkExceptions1min.labels(metricsLabels).get();
+        return _statTotalSinkExceptions1min.get();
     }
 
     public double getAvgProcessLatency1min() {
-        return statProcessLatency1min.labels(metricsLabels).get().count <= 0.0
-                ? 0 : statProcessLatency1min.labels(metricsLabels).get().sum / statProcessLatency1min.labels(metricsLabels).get().count;
+        return _statProcessLatency1min.get().count <= 0.0
+                ? 0 : _statProcessLatency1min.get().sum / _statProcessLatency1min.get().count;
     }
 
     public double getProcessLatency50P1min() {
-        return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.5);
+        return _statProcessLatency1min.get().quantiles.get(0.5);
     }
 
     public double getProcessLatency90P1min() {
-        return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.9);
+        return _statProcessLatency1min.get().quantiles.get(0.9);
     }
 
     public double getProcessLatency99P1min() {
-        return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.99);
+        return _statProcessLatency1min.get().quantiles.get(0.99);
     }
 
     public double getProcessLatency99_9P1min() {
-        return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.999);
+        return _statProcessLatency1min.get().quantiles.get(0.999);
     }
 
     public void reset() {
         statTotalProcessedSuccessfully1min.clear();
+        _statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels);
+
         statTotalSysExceptions1min.clear();
+        _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
+
         statTotalUserExceptions1min.clear();
+        _statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels);
+
         statTotalSourceExceptions1min.clear();
+        _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels);
+
         statTotalSinkExceptions1min.clear();
+        _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels);
+
         statProcessLatency1min.clear();
+        _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
+
         statTotalRecordsRecieved1min.clear();
+        _statTotalRecordsRecieved1min = statTotalRecordsRecieved1min.labels(metricsLabels);
+
         latestUserExceptions.clear();
         latestSystemExceptions.clear();
         latestSourceExceptions.clear();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 9e727a5..e858e90 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -272,7 +272,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     instanceConfig.getFunctionDetails().getName(),
                     instanceConfig.getInstanceId()), t);
             deathException = t;
-            stats.incrSysExceptions(t);
+            if (stats != null) {
+                stats.incrSysExceptions(t);
+            }
             return;
         } finally {
             log.info("Closing instance");