You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/09/23 00:19:49 UTC
[pulsar] branch master updated: Enable worker JVM metrics to be
reported via Prometheus (#8097)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 00ea30d Enable worker JVM metrics to be reported via Prometheus (#8097)
00ea30d is described below
commit 00ea30d90aa47bd5843b6a900c0abf3144a480e3
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Sep 22 17:19:22 2020 -0700
Enable worker JVM metrics to be reported via Prometheus (#8097)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../org/apache/pulsar/functions/worker/Worker.java | 2 +-
.../pulsar/functions/worker/WorkerService.java | 10 ++++++---
.../functions/worker/WorkerStatsManager.java | 26 +++++++++++++++++++++-
3 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 968b8b4..239fb27 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -62,7 +62,7 @@ public class Worker {
public Worker(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
- this.workerService = new WorkerService(workerConfig);
+ this.workerService = new WorkerService(workerConfig, true);
this.errorNotifier = ErrorNotifier.getDefaultImpl();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index c924e80..4b7e23c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -76,12 +76,16 @@ public class WorkerService {
private FunctionAssignmentTailer functionAssignmentTailer;
private final WorkerStatsManager workerStatsManager;
- public WorkerService(WorkerConfig workerConfig) {
+ public WorkerService(WorkerConfig workerConfig, boolean runAsStandalone) {
this.workerConfig = workerConfig;
this.statsUpdater = Executors
- .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
+ .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig);
- workerStatsManager = new WorkerStatsManager(workerConfig);
+ this.workerStatsManager = new WorkerStatsManager(workerConfig, runAsStandalone);
+ }
+
+ public WorkerService(WorkerConfig workerConfig) {
+ this(workerConfig, false);
}
public void start(URI dlogUri,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index 5d48959..703b131 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -19,9 +19,13 @@
package org.apache.pulsar.functions.worker;
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
+
+import io.netty.util.internal.PlatformDependent;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
+import io.prometheus.client.hotspot.DefaultExports;
import lombok.Setter;
import org.apache.pulsar.functions.instance.stats.PrometheusTextFormat;
import org.apache.pulsar.functions.proto.Function;
@@ -32,6 +36,10 @@ import java.util.List;
public class WorkerStatsManager {
+ static {
+ DefaultExports.initialize();
+ }
+
private static final String PULSAR_FUNCTION_WORKER_METRICS_PREFIX = "pulsar_function_worker_";
private static final String START_UP_TIME = "start_up_time_ms";
private static final String INSTANCE_COUNT = "instance_count";
@@ -79,7 +87,7 @@ public class WorkerStatsManager {
private final Summary.Child _stopInstanceProcessTime;
private final Summary.Child _startInstanceProcessTime;
- public WorkerStatsManager(WorkerConfig workerConfig) {
+ public WorkerStatsManager(WorkerConfig workerConfig, boolean runAsStandalone) {
metricsLabels = new String[]{workerConfig.getPulsarFunctionsCluster()};
@@ -156,6 +164,22 @@ public class WorkerStatsManager {
.quantile(1, 0.01)
.register(collectorRegistry);
_startInstanceProcessTime = startInstanceProcessTime.labels(metricsLabels);
+
+ if (runAsStandalone) {
+ Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return getJvmDirectMemoryUsed();
+ }
+ }).register(CollectorRegistry.defaultRegistry);
+
+ Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return PlatformDependent.maxDirectMemory();
+ }
+ }).register(CollectorRegistry.defaultRegistry);
+ }
}
private Long startupTimeStart;