You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/09/23 00:19:49 UTC

[pulsar] branch master updated: Enable worker JVM metrics to be reported via Prometheus (#8097)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 00ea30d  Enable worker JVM metrics to be reported via Prometheus (#8097)
00ea30d is described below

commit 00ea30d90aa47bd5843b6a900c0abf3144a480e3
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Sep 22 17:19:22 2020 -0700

    Enable worker JVM metrics to be reported via Prometheus (#8097)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../org/apache/pulsar/functions/worker/Worker.java |  2 +-
 .../pulsar/functions/worker/WorkerService.java     | 10 ++++++---
 .../functions/worker/WorkerStatsManager.java       | 26 +++++++++++++++++++++-
 3 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 968b8b4..239fb27 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -62,7 +62,7 @@ public class Worker {
 
     public Worker(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
-        this.workerService = new WorkerService(workerConfig);
+        this.workerService = new WorkerService(workerConfig, true);
         this.errorNotifier = ErrorNotifier.getDefaultImpl();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index c924e80..4b7e23c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -76,12 +76,16 @@ public class WorkerService {
     private FunctionAssignmentTailer functionAssignmentTailer;
     private final WorkerStatsManager workerStatsManager;
 
-    public WorkerService(WorkerConfig workerConfig) {
+    public WorkerService(WorkerConfig workerConfig, boolean runAsStandalone) {
         this.workerConfig = workerConfig;
         this.statsUpdater = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
+          .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
         this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig);
-        workerStatsManager = new WorkerStatsManager(workerConfig);
+        this.workerStatsManager = new WorkerStatsManager(workerConfig, runAsStandalone);
+    }
+
+    public WorkerService(WorkerConfig workerConfig) {
+        this(workerConfig, false);
     }
 
     public void start(URI dlogUri,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index 5d48959..703b131 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -19,9 +19,13 @@
 
 package org.apache.pulsar.functions.worker;
 
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
+
+import io.netty.util.internal.PlatformDependent;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Summary;
+import io.prometheus.client.hotspot.DefaultExports;
 import lombok.Setter;
 import org.apache.pulsar.functions.instance.stats.PrometheusTextFormat;
 import org.apache.pulsar.functions.proto.Function;
@@ -32,6 +36,10 @@ import java.util.List;
 
 public class WorkerStatsManager {
 
+  static {
+    DefaultExports.initialize();
+  }
+
   private static final String PULSAR_FUNCTION_WORKER_METRICS_PREFIX = "pulsar_function_worker_";
   private static final String START_UP_TIME = "start_up_time_ms";
   private static final String INSTANCE_COUNT = "instance_count";
@@ -79,7 +87,7 @@ public class WorkerStatsManager {
   private final Summary.Child _stopInstanceProcessTime;
   private final Summary.Child _startInstanceProcessTime;
 
-  public WorkerStatsManager(WorkerConfig workerConfig) {
+  public WorkerStatsManager(WorkerConfig workerConfig, boolean runAsStandalone) {
 
     metricsLabels = new String[]{workerConfig.getPulsarFunctionsCluster()};
 
@@ -156,6 +164,22 @@ public class WorkerStatsManager {
       .quantile(1, 0.01)
       .register(collectorRegistry);
     _startInstanceProcessTime = startInstanceProcessTime.labels(metricsLabels);
+
+    if (runAsStandalone) {
+      Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Gauge.Child() {
+        @Override
+        public double get() {
+          return getJvmDirectMemoryUsed();
+        }
+      }).register(CollectorRegistry.defaultRegistry);
+
+      Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Gauge.Child() {
+        @Override
+        public double get() {
+          return PlatformDependent.maxDirectMemory();
+        }
+      }).register(CollectorRegistry.defaultRegistry);
+    }
   }
 
   private Long startupTimeStart;