You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/07 00:53:14 UTC

[pulsar] branch master updated: [functions] Provide more options for func worker prometheus metrics (#6801)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee0ba4c  [functions] Provide more options for func worker prometheus metrics (#6801)
ee0ba4c is described below

commit ee0ba4c745e0e829f48d0ddb3be70db8a1b0bec5
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Sat Jun 6 18:53:01 2020 -0600

    [functions] Provide more options for func worker prometheus metrics (#6801)
    
    The broker and proxy both allow for hitting the metrics endpoint without auth. The functions
    worker should allow that to be configurable as well. This adds an option to
    allow for metrics endpoint to allow the endpoint to be hit without auth
    
    Additionally, the functions worker doesn't expose the default prometheus
    metrics (such as JVM info, etc).
    
    This commit implements and adds an option to support that
    
    Co-authored-by: Addison Higham <ah...@instructure.com>
---
 .../pulsar/functions/worker/WorkerConfig.java      | 11 ++++++
 .../pulsar/functions/worker/rest/WorkerServer.java | 11 ++++--
 .../worker/rest/api/FunctionsMetricsResource.java  | 39 ++++++++++++++++++++--
 3 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 8e256a6..c1280ca 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -103,6 +103,17 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private Integer workerPortTls;
     @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "Whether the '/metrics' endpoint requires authentication. Defaults to true."
+                    + "'authenticationEnabled' must also be set for this to take effect."
+    )
+    private boolean authenticateMetricsEndpoint = true;
+    @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "Whether the '/metrics' endpoint should return default prometheus metrics. Defaults to false."
+    )
+    private boolean includeStandardPrometheusMetrics = false;
+    @FieldContext(
         category = CATEGORY_WORKER,
         doc = "Classname of Pluggable JVM GC metrics logger that can log GC specific metrics")
     private String jvmGCMetricsLoggerClassName;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 6f1fb9b..56ee46c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.net.BindException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
@@ -30,6 +31,7 @@ import java.util.TimeZone;
 
 import javax.servlet.DispatcherType;
 
+import io.prometheus.client.exporter.MetricsServlet;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.broker.web.AuthenticationFilter;
@@ -103,7 +105,8 @@ public class WorkerServer {
                 newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiV2Resources()), workerService));
         handlers.add(
                 newServletContextHandler("/admin/v3", new ResourceConfig(Resources.getApiV3Resources()), workerService));
-        handlers.add(newServletContextHandler("/", new ResourceConfig(Resources.getRootResources()), workerService));
+        // don't require auth for metrics or config routes
+        handlers.add(newServletContextHandler("/", new ResourceConfig(Resources.getRootResources()), workerService, workerConfig.isAuthenticateMetricsEndpoint()));
 
         RequestLogHandler requestLogHandler = new RequestLogHandler();
         Slf4jRequestLog requestLog = new Slf4jRequestLog();
@@ -142,6 +145,10 @@ public class WorkerServer {
     }
 
     public static ServletContextHandler newServletContextHandler(String contextPath, ResourceConfig config, WorkerService workerService) {
+        return newServletContextHandler(contextPath, config, workerService, true);
+    }
+
+    public static ServletContextHandler newServletContextHandler(String contextPath, ResourceConfig config, WorkerService workerService, boolean requireAuthentication) {
         final ServletContextHandler contextHandler =
                 new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
 
@@ -153,7 +160,7 @@ public class WorkerServer {
         final ServletHolder apiServlet =
                 new ServletHolder(new ServletContainer(config));
         contextHandler.addServlet(apiServlet, "/*");
-        if (workerService.getWorkerConfig().isAuthenticationEnabled()) {
+        if (workerService.getWorkerConfig().isAuthenticationEnabled() && requireAuthentication) {
             FilterHolder filter = new FilterHolder(new AuthenticationFilter(workerService.getAuthenticationService()));
             contextHandler.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
index 9d7b316..e9cfbfa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.functions.worker.rest.api;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.functions.worker.FunctionsStatsGenerator;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -31,17 +33,26 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
 
 @Path("/")
 public class FunctionsMetricsResource extends FunctionApiResource {
     @Path("metrics")
     @GET
     @Produces(MediaType.TEXT_PLAIN)
-    public Response getMetrics() {
+    public Response getMetrics() throws IOException {
 
         WorkerService workerService = get();
-
         ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+        // if request, also attach the prometheus metrics
+        if (workerService.getWorkerConfig().isIncludeStandardPrometheusMetrics()) {
+            Writer writer = new BufWriter(buf);
+            TextFormat.write004(writer, CollectorRegistry.defaultRegistry.metricFamilySamples());
+        }
+
         try {
             SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
             FunctionsStatsGenerator.generate(workerService,"default", stream);
@@ -60,4 +71,28 @@ public class FunctionsMetricsResource extends FunctionApiResource {
             buf.release();
         }
     }
+
+    private static class BufWriter extends Writer {
+        private final ByteBuf buf;
+
+        public BufWriter(ByteBuf buf) {
+            this.buf = buf;
+        }
+
+        @Override
+        public void write(char[] cbuf, int off, int len) throws IOException {
+            buf.writeCharSequence(CharBuffer.wrap(cbuf, off, len), StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            // noop
+
+        }
+
+        @Override
+        public void close() throws IOException {
+            // noop
+        }
+    }
 }