You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/19 02:44:05 UTC
[pulsar] branch master updated: optimize java functions stats code
(#3209)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 83cf25c optimize java functions stats code (#3209)
83cf25c is described below
commit 83cf25c9699be808de258e308d0c31efc53deed8
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Dec 18 18:43:59 2018 -0800
optimize java functions stats code (#3209)
### Motivation
Optimized the stats collection code for java functions to reduce stats collection foot print both memory and cpu. Reduced the number of objects allocated by 20% in java functions
---
.../functions/instance/FunctionStatsManager.java | 127 ++++++++++++++-------
.../functions/instance/JavaInstanceRunnable.java | 4 +-
2 files changed, 89 insertions(+), 42 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 96941f5..72b792e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -118,6 +118,23 @@ public class FunctionStatsManager implements AutoCloseable {
final Gauge sinkExceptions;
+ // As an optimization
+ private final Counter.Child _statTotalProcessedSuccessfully;
+ private final Counter.Child _statTotalSysExceptions;
+ private final Counter.Child _statTotalUserExceptions;
+ private final Counter.Child _statTotalSourceExceptions;
+ private final Counter.Child _statTotalSinkExceptions;
+ private final Summary.Child _statProcessLatency;
+ private final Gauge.Child _statlastInvocation;
+ private final Counter.Child _statTotalRecordsRecieved;
+ private Counter.Child _statTotalProcessedSuccessfully1min;
+ private Counter.Child _statTotalSysExceptions1min;
+ private Counter.Child _statTotalUserExceptions1min;
+ private Counter.Child _statTotalSourceExceptions1min;
+ private Counter.Child _statTotalSinkExceptions1min;
+ private Summary.Child _statProcessLatency1min;
+ private Counter.Child _statTotalRecordsRecieved1min;
+
private String[] metricsLabels;
private ScheduledFuture<?> scheduledFuture;
@@ -152,30 +169,35 @@ public class FunctionStatsManager implements AutoCloseable {
.help("Total number of messages processed successfully.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.labels(metricsLabels);
statTotalSysExceptions = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
statTotalUserExceptions = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalUserExceptions = statTotalUserExceptions.labels(metricsLabels);
statTotalSourceExceptions = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
.help("Total number of source exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalSourceExceptions = statTotalSourceExceptions.labels(metricsLabels);
statTotalSinkExceptions = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
.help("Total number of sink exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalSinkExceptions = statTotalSinkExceptions.labels(metricsLabels);
statProcessLatency = Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
@@ -186,48 +208,56 @@ public class FunctionStatsManager implements AutoCloseable {
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statProcessLatency = statProcessLatency.labels(metricsLabels);
statlastInvocation = Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
.help("The timestamp of the last invocation of the function.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statlastInvocation = statlastInvocation.labels(metricsLabels);
statTotalRecordsRecieved = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalRecordsRecieved = statTotalRecordsRecieved.labels(metricsLabels);
statTotalProcessedSuccessfully1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
.help("Total number of messages processed successfully in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels);
statTotalSysExceptions1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
.help("Total number of system exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
statTotalUserExceptions1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min)
.help("Total number of user exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels);
statTotalSourceExceptions1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min)
.help("Total number of source exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels);
statTotalSinkExceptions1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min)
.help("Total number of sink exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels);
statProcessLatency1min = Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
@@ -238,19 +268,20 @@ public class FunctionStatsManager implements AutoCloseable {
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
statTotalRecordsRecieved1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of messages received from source in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ _statTotalRecordsRecieved1min = statTotalRecordsRecieved1min.labels(metricsLabels);
userExceptions = Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from user code.")
.register(collectorRegistry);
-
sysExceptions = Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
.labelNames(exceptionMetricsLabelNames)
@@ -352,41 +383,41 @@ public class FunctionStatsManager implements AutoCloseable {
}
public void incrTotalReceived() {
- statTotalRecordsRecieved.labels(metricsLabels).inc();
- statTotalRecordsRecieved1min.labels(metricsLabels).inc();
+ _statTotalRecordsRecieved.inc();
+ _statTotalRecordsRecieved1min.inc();
}
public void incrTotalProcessedSuccessfully() {
- statTotalProcessedSuccessfully.labels(metricsLabels).inc();
- statTotalProcessedSuccessfully1min.labels(metricsLabels).inc();
+ _statTotalProcessedSuccessfully.inc();
+ _statTotalProcessedSuccessfully1min.inc();
}
public void incrSysExceptions(Throwable sysException) {
- statTotalSysExceptions.labels(metricsLabels).inc();
- statTotalSysExceptions1min.labels(metricsLabels).inc();
+ _statTotalSysExceptions.inc();
+ _statTotalSysExceptions1min.inc();
addSystemException(sysException);
}
public void incrUserExceptions(Exception userException) {
- statTotalUserExceptions.labels(metricsLabels).inc();
- statTotalUserExceptions1min.labels(metricsLabels).inc();
+ _statTotalUserExceptions.inc();
+ _statTotalUserExceptions1min.inc();
addUserException(userException);
}
public void incrSourceExceptions(Exception userException) {
- statTotalSourceExceptions.labels(metricsLabels).inc();
- statTotalSourceExceptions1min.labels(metricsLabels).inc();
+ _statTotalSourceExceptions.inc();
+ _statTotalSourceExceptions1min.inc();
addSourceException(userException);
}
public void incrSinkExceptions(Exception userException) {
- statTotalSinkExceptions.labels(metricsLabels).inc();
- statTotalSinkExceptions1min.labels(metricsLabels).inc();
+ _statTotalSinkExceptions.inc();
+ _statTotalSinkExceptions1min.inc();
addSinkException(userException);
}
public void setLastInvocation(long ts) {
- statlastInvocation.labels(metricsLabels).set(ts);
+ _statlastInvocation.set(ts);
}
private Long processTimeStart;
@@ -397,113 +428,127 @@ public class FunctionStatsManager implements AutoCloseable {
public void processTimeEnd() {
if (processTimeStart != null) {
double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D;
- statProcessLatency.labels(metricsLabels).observe(endTimeMs);
- statProcessLatency1min.labels(metricsLabels).observe(endTimeMs);
+ _statProcessLatency.observe(endTimeMs);
+ _statProcessLatency1min.observe(endTimeMs);
}
}
public double getTotalProcessedSuccessfully() {
- return statTotalProcessedSuccessfully.labels(metricsLabels).get();
+ return _statTotalProcessedSuccessfully.get();
}
public double getTotalRecordsReceived() {
- return statTotalRecordsRecieved.labels(metricsLabels).get();
+ return _statTotalRecordsRecieved.get();
}
public double getTotalSysExceptions() {
- return statTotalSysExceptions.labels(metricsLabels).get();
+ return _statTotalSysExceptions.get();
}
public double getTotalUserExceptions() {
- return statTotalUserExceptions.labels(metricsLabels).get();
+ return _statTotalUserExceptions.get();
}
public double getTotalSourceExceptions() {
- return statTotalSourceExceptions.labels(metricsLabels).get();
+ return _statTotalSourceExceptions.get();
}
public double getTotalSinkExceptions() {
- return statTotalSinkExceptions.labels(metricsLabels).get();
+ return _statTotalSinkExceptions.get();
}
public double getLastInvocation() {
- return statlastInvocation.labels(metricsLabels).get();
+ return _statlastInvocation.get();
}
public double getAvgProcessLatency() {
- return statProcessLatency.labels(metricsLabels).get().count <= 0.0
- ? 0 : statProcessLatency.labels(metricsLabels).get().sum / statProcessLatency.labels(metricsLabels).get().count;
+ return _statProcessLatency.get().count <= 0.0
+ ? 0 : _statProcessLatency.get().sum / _statProcessLatency.get().count;
}
public double getProcessLatency50P() {
- return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.5);
+ return _statProcessLatency.get().quantiles.get(0.5);
}
public double getProcessLatency90P() {
- return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.9);
+ return _statProcessLatency.get().quantiles.get(0.9);
}
public double getProcessLatency99P() {
- return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.99);
+ return _statProcessLatency.get().quantiles.get(0.99);
}
public double getProcessLatency99_9P() {
- return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.999);
+ return _statProcessLatency.get().quantiles.get(0.999);
}
public double getTotalProcessedSuccessfully1min() {
- return statTotalProcessedSuccessfully1min.labels(metricsLabels).get();
+ return _statTotalProcessedSuccessfully1min.get();
}
public double getTotalRecordsReceived1min() {
- return statTotalRecordsRecieved1min.labels(metricsLabels).get();
+ return _statTotalRecordsRecieved1min.get();
}
public double getTotalSysExceptions1min() {
- return statTotalSysExceptions1min.labels(metricsLabels).get();
+ return _statTotalSysExceptions1min.get();
}
public double getTotalUserExceptions1min() {
- return statTotalUserExceptions1min.labels(metricsLabels).get();
+ return _statTotalUserExceptions1min.get();
}
public double getTotalSourceExceptions1min() {
- return statTotalSourceExceptions1min.labels(metricsLabels).get();
+ return _statTotalSourceExceptions1min.get();
}
public double getTotalSinkExceptions1min() {
- return statTotalSinkExceptions1min.labels(metricsLabels).get();
+ return _statTotalSinkExceptions1min.get();
}
public double getAvgProcessLatency1min() {
- return statProcessLatency1min.labels(metricsLabels).get().count <= 0.0
- ? 0 : statProcessLatency1min.labels(metricsLabels).get().sum / statProcessLatency1min.labels(metricsLabels).get().count;
+ return _statProcessLatency1min.get().count <= 0.0
+ ? 0 : _statProcessLatency1min.get().sum / _statProcessLatency1min.get().count;
}
public double getProcessLatency50P1min() {
- return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.5);
+ return _statProcessLatency1min.get().quantiles.get(0.5);
}
public double getProcessLatency90P1min() {
- return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.9);
+ return _statProcessLatency1min.get().quantiles.get(0.9);
}
public double getProcessLatency99P1min() {
- return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.99);
+ return _statProcessLatency1min.get().quantiles.get(0.99);
}
public double getProcessLatency99_9P1min() {
- return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.999);
+ return _statProcessLatency1min.get().quantiles.get(0.999);
}
public void reset() {
statTotalProcessedSuccessfully1min.clear();
+ _statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels);
+
statTotalSysExceptions1min.clear();
+ _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);
+
statTotalUserExceptions1min.clear();
+ _statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels);
+
statTotalSourceExceptions1min.clear();
+ _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels);
+
statTotalSinkExceptions1min.clear();
+ _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels);
+
statProcessLatency1min.clear();
+ _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
+
statTotalRecordsRecieved1min.clear();
+ _statTotalRecordsRecieved1min = statTotalRecordsRecieved1min.labels(metricsLabels);
+
latestUserExceptions.clear();
latestSystemExceptions.clear();
latestSourceExceptions.clear();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 9e727a5..e858e90 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -272,7 +272,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId()), t);
deathException = t;
- stats.incrSysExceptions(t);
+ if (stats != null) {
+ stats.incrSysExceptions(t);
+ }
return;
} finally {
log.info("Closing instance");