You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/08/24 05:02:18 UTC

[pulsar] branch branch-3.1 updated: [improve][proxy] Support disabling metrics endpoint (#21031)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new fd6c4f817c3 [improve][proxy] Support disabling metrics endpoint (#21031)
fd6c4f817c3 is described below

commit fd6c4f817c3d8c9e6e8afffe5080637ef9727e56
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Sat Aug 19 10:03:10 2023 -0500

    [improve][proxy] Support disabling metrics endpoint (#21031)
    
    (cherry picked from commit d06cda6cd8a58b8a7e0678183f05c08059ddb9b2)
---
 conf/proxy.conf                                    |  2 ++
 .../pulsar/proxy/server/ProxyConfiguration.java    |  6 ++++
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 18 ++++++----
 .../org/apache/pulsar/proxy/server/WebServer.java  | 24 ++++++++-----
 .../org/apache/pulsar/proxy/stats/ProxyStats.java  | 39 +++++++++++++++++++++-
 5 files changed, 72 insertions(+), 17 deletions(-)

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 2fda32abc1b..c41c54670ee 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -370,6 +370,8 @@ zooKeeperCacheExpirySeconds=-1
 
 ### --- Metrics --- ###
 
+# Whether to enable the proxy's /metrics, /proxy-stats, and /status.html http endpoints
+enableProxyStatsEndpoints=true
 # Whether the '/metrics' endpoint requires authentication. Defaults to true
 authenticateMetricsEndpoint=true
 # Enable cache metrics data, default value is false
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 3ecd670cbbf..a4cb7926beb 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -371,6 +371,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private int authenticationRefreshCheckSeconds = 60;
 
+    @FieldContext(
+        category = CATEGORY_HTTP,
+        doc = "Whether to enable the proxy's /metrics, /proxy-stats, and /status.html http endpoints"
+    )
+    private boolean enableProxyStatsEndpoints = true;
+
     @FieldContext(
         category = CATEGORY_AUTHENTICATION,
         doc = "Whether the '/metrics' endpoint requires authentication. Defaults to true."
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index b4854780d54..ee8f648182d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -253,15 +253,19 @@ public class ProxyServiceStarter {
                                      ProxyConfiguration config,
                                      ProxyService service,
                                      BrokerDiscoveryProvider discoveryProvider) throws Exception {
-        if (service != null) {
-            PrometheusMetricsServlet metricsServlet = service.getMetricsServlet();
-            if (metricsServlet != null) {
-                server.addServlet("/metrics", new ServletHolder(metricsServlet),
-                        Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
+        if (config.isEnableProxyStatsEndpoints()) {
+            server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
+                    VipStatus.class);
+            server.addRestResource("/proxy-stats", ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service,
+                    ProxyStats.class);
+            if (service != null) {
+                PrometheusMetricsServlet metricsServlet = service.getMetricsServlet();
+                if (metricsServlet != null) {
+                    server.addServlet("/metrics", new ServletHolder(metricsServlet),
+                            Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
+                }
             }
         }
-        server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(), VipStatus.class);
-        server.addRestResource("/proxy-stats", ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service, ProxyStats.class);
 
         AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 1ca8dc93ebf..edbcfe0847c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -197,12 +197,20 @@ public class WebServer {
 
     public void addServlet(String basePath, ServletHolder servletHolder,
                            List<Pair<String, Object>> attributes, boolean requireAuthentication) {
+        addServlet(basePath, servletHolder, attributes, requireAuthentication, true);
+    }
+
+    private void addServlet(String basePath, ServletHolder servletHolder,
+            List<Pair<String, Object>> attributes, boolean requireAuthentication, boolean checkForExistingPaths) {
         popularServletParams(servletHolder, config);
 
-        Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
-        if (existingPath.isPresent()) {
-            throw new IllegalArgumentException(
-                    String.format("Cannot add servlet at %s, path %s already exists", basePath, existingPath.get()));
+        if (checkForExistingPaths) {
+            Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
+            if (existingPath.isPresent()) {
+                throw new IllegalArgumentException(
+                        String.format("Cannot add servlet at %s, path %s already exists", basePath,
+                                existingPath.get()));
+            }
         }
         servletPaths.add(basePath);
 
@@ -237,11 +245,9 @@ public class WebServer {
         config.register(JsonMapperProvider.class);
         ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
         servletHolder.setAsyncSupported(true);
-        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-        context.setContextPath(basePath);
-        context.addServlet(servletHolder, MATCH_ALL);
-        context.setAttribute(attribute, attributeValue);
-        handlers.add(context);
+        // This method has not historically checked for existing paths, so we don't check here either. The
+        // method call is added to reduce code duplication.
+        addServlet(basePath, servletHolder, Collections.singletonList(Pair.of(attribute, attributeValue)), true, false);
     }
 
     public int getExternalServicePort() {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
index afa2e12dabb..67fe30db161 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.proxy.stats;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import io.netty.channel.Channel;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
@@ -27,7 +28,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -36,19 +40,27 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationParameters;
+import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.proxy.server.ProxyService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Path("/")
 @Api(value = "/proxy-stats", description = "Stats for proxy", tags = "proxy-stats", hidden = true)
 @Produces(MediaType.APPLICATION_JSON)
 public class ProxyStats {
 
+    private static final Logger log = LoggerFactory.getLogger(ProxyStats.class);
     public static final String ATTRIBUTE_PULSAR_PROXY_NAME = "pulsar-proxy";
 
     private ProxyService service;
 
     @Context
     protected ServletContext servletContext;
+    @Context
+    protected HttpServletRequest httpRequest;
 
     @GET
     @Path("/connections")
@@ -56,6 +68,7 @@ public class ProxyStats {
             response = List.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 503, message = "Proxy service is not initialized") })
     public List<ConnectionStats> metrics() {
+        throwIfNotSuperUser("metrics");
         List<ConnectionStats> stats = new ArrayList<>();
         proxyService().getClientCnxs().forEach(cnx -> {
             if (cnx.getDirectProxyHandler() == null) {
@@ -76,7 +89,7 @@ public class ProxyStats {
     @ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy logging should be > 2 to capture topic stats"),
             @ApiResponse(code = 503, message = "Proxy service is not initialized") })
     public Map<String, TopicStats> topics() {
-
+        throwIfNotSuperUser("topics");
         Optional<Integer> logLevel = proxyService().getConfiguration().getProxyLogLevel();
         if (!logLevel.isPresent() || logLevel.get() < 2) {
             throw new RestException(Status.PRECONDITION_FAILED, "Proxy doesn't have logging level 2");
@@ -90,6 +103,7 @@ public class ProxyStats {
             notes = "It only changes log-level in memory, change it config file to persist the change")
     @ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy log level can be [0-2]"), })
     public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) {
+        throwIfNotSuperUser("updateProxyLogLevel");
         if (logLevel < 0 || logLevel > 2) {
             throw new RestException(Status.PRECONDITION_FAILED, "Proxy log level can be only [0-2]");
         }
@@ -100,6 +114,7 @@ public class ProxyStats {
     @Path("/logging")
     @ApiOperation(hidden = true, value = "Get proxy logging")
     public int getProxyLogLevel(@PathParam("logLevel") int logLevel) {
+        throwIfNotSuperUser("getProxyLogLevel");
         return proxyService().getProxyLogLevel();
     }
 
@@ -112,4 +127,26 @@ public class ProxyStats {
         }
         return service;
     }
+
+    private void throwIfNotSuperUser(String action) {
+        if (proxyService().getConfiguration().isAuthorizationEnabled()) {
+            AuthenticationParameters authParams = AuthenticationParameters.builder()
+                    .clientRole((String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName))
+                    .clientAuthenticationDataSource((AuthenticationDataSource)
+                            httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName))
+                    .build();
+            try {
+                if (authParams.getClientRole() == null
+                        || !proxyService().getAuthorizationService().isSuperUser(authParams).get(30, SECONDS)) {
+                    log.error("Client with role [{}] is not authorized to {}", authParams.getClientRole(), action);
+                    throw new org.apache.pulsar.common.util.RestException(Status.UNAUTHORIZED,
+                            "Client is not authorized to perform operation");
+                }
+            } catch (ExecutionException | TimeoutException | InterruptedException e) {
+                log.warn("Time-out {} sec while checking the role {} is a super user role ", 30,
+                        authParams.getClientRole());
+                throw new org.apache.pulsar.common.util.RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+            }
+        }
+    }
 }