You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/04/27 07:48:53 UTC

[pulsar] branch branch-2.10 updated (58323b01b6b -> 4af6b2620b0)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 58323b01b6b Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
     new b2f54ba385b Add support of PrometheusRawMetricsProvider for the Pulsar-Proxy (#14681)
     new 4af6b2620b0 PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../PrometheusMetricsGeneratorUtils.java           | 106 ++++++++++++
 .../stats/prometheus/PrometheusMetricsServlet.java |  41 ++---
 .../prometheus/PrometheusRawMetricsProvider.java   |   0
 .../broker/stats/prometheus/package-info.java      |   0
 .../org/apache/pulsar/broker/PulsarService.java    |  37 +++--
 .../apache/pulsar/broker/service/ServerCnx.java    |  16 +-
 .../prometheus/PrometheusMetricsGenerator.java     |  54 +-----
 .../prometheus/PulsarPrometheusMetricsServlet.java |  52 ++++++
 .../apache/pulsar/proxy/server/ProxyService.java   |  37 +++++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  11 +-
 .../proxy/server/ProxyPrometheusMetricsTest.java   | 181 +++++++++++++++++++++
 11 files changed, 435 insertions(+), 100 deletions(-)
 create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
 rename {pulsar-broker => pulsar-broker-common}/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java (71%)
 rename {pulsar-broker => pulsar-broker-common}/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java (100%)
 copy {pulsar-broker => pulsar-broker-common}/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java (100%)
 create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
 create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java


[pulsar] 01/02: Add support of PrometheusRawMetricsProvider for the Pulsar-Proxy (#14681)

Posted by eo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b2f54ba385b49779151baff0874dce41faaa3ca4
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Wed Mar 16 11:58:20 2022 +0100

    Add support of PrometheusRawMetricsProvider for the Pulsar-Proxy (#14681)
    
    (cherry picked from commit ea95b28336acee8fe42a5dd8d95c92500dad08ea)
---
 .../PrometheusMetricsGeneratorUtils.java           | 106 ++++++++++++
 .../stats/prometheus/PrometheusMetricsServlet.java |  41 ++---
 .../prometheus/PrometheusRawMetricsProvider.java   |   0
 .../broker/stats/prometheus/package-info.java      |  14 --
 .../org/apache/pulsar/broker/PulsarService.java    |  37 +++--
 .../prometheus/PrometheusMetricsGenerator.java     |  54 +-----
 .../prometheus/PulsarPrometheusMetricsServlet.java |  52 ++++++
 .../apache/pulsar/proxy/server/ProxyService.java   |  37 +++++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  11 +-
 .../proxy/server/ProxyPrometheusMetricsTest.java   | 181 +++++++++++++++++++++
 10 files changed, 425 insertions(+), 108 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
new file mode 100644
index 00000000000..0e298151971
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Enumeration;
+import java.util.List;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * Generate metrics in a text format suitable to be consumed by Prometheus.
+ * Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
+ */
+public class PrometheusMetricsGeneratorUtils {
+
+    public static void generate(String cluster, OutputStream out,
+                                List<PrometheusRawMetricsProvider> metricsProviders)
+            throws IOException {
+        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+        try {
+            SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
+            generateSystemMetrics(stream, cluster);
+            if (metricsProviders != null) {
+                for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) {
+                    metricsProvider.generate(stream);
+                }
+            }
+            out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
+        } finally {
+            buf.release();
+        }
+    }
+
+    public static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) {
+        Enumeration<Collector.MetricFamilySamples> metricFamilySamples =
+                CollectorRegistry.defaultRegistry.metricFamilySamples();
+        while (metricFamilySamples.hasMoreElements()) {
+            Collector.MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
+
+            // Write type of metric
+            stream.write("# TYPE ").write(metricFamily.name).write(' ')
+                    .write(getTypeStr(metricFamily.type)).write('\n');
+
+            for (int i = 0; i < metricFamily.samples.size(); i++) {
+                Collector.MetricFamilySamples.Sample sample = metricFamily.samples.get(i);
+                stream.write(sample.name);
+                stream.write("{cluster=\"").write(cluster).write('"');
+                for (int j = 0; j < sample.labelNames.size(); j++) {
+                    String labelValue = sample.labelValues.get(j);
+                    if (labelValue != null) {
+                        labelValue = labelValue.replace("\"", "\\\"");
+                    }
+
+                    stream.write(",");
+                    stream.write(sample.labelNames.get(j));
+                    stream.write("=\"");
+                    stream.write(labelValue);
+                    stream.write('"');
+                }
+
+                stream.write("} ");
+                stream.write(Collector.doubleToGoString(sample.value));
+                stream.write('\n');
+            }
+        }
+    }
+
+    static String getTypeStr(Collector.Type type) {
+        switch (type) {
+            case COUNTER:
+                return "counter";
+            case GAUGE:
+                return "gauge";
+            case SUMMARY        :
+                return "summary";
+            case HISTOGRAM:
+                return "histogram";
+            case UNTYPED:
+            default:
+                return "untyped";
+        }
+    }
+
+}
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
similarity index 71%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
rename to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 722c96c28a7..1c3277d5ee4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.EOFException;
 import java.io.IOException;
@@ -28,36 +28,28 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import javax.servlet.AsyncContext;
 import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import org.apache.pulsar.broker.PulsarService;
-import org.eclipse.jetty.http.HttpStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PrometheusMetricsServlet extends HttpServlet {
 
     private static final long serialVersionUID = 1L;
+    private static final int HTTP_STATUS_OK_200 = 200;
+    private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
 
-    private final PulsarService pulsar;
-    private final boolean shouldExportTopicMetrics;
-    private final boolean shouldExportConsumerMetrics;
-    private final boolean shouldExportProducerMetrics;
     private final long metricsServletTimeoutMs;
-    private final boolean splitTopicAndPartitionLabel;
-    private List<PrometheusRawMetricsProvider> metricsProviders;
+    private final String cluster;
+    protected List<PrometheusRawMetricsProvider> metricsProviders;
 
     private ExecutorService executor = null;
 
-    public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-                                    boolean shouldExportProducerMetrics, boolean splitTopicAndPartitionLabel) {
-        this.pulsar = pulsar;
-        this.shouldExportTopicMetrics = includeTopicMetrics;
-        this.shouldExportConsumerMetrics = includeConsumerMetrics;
-        this.shouldExportProducerMetrics = shouldExportProducerMetrics;
-        this.metricsServletTimeoutMs = pulsar.getConfiguration().getMetricsServletTimeoutMs();
-        this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
+    public PrometheusMetricsServlet(long metricsServletTimeoutMs, String cluster) {
+        this.metricsServletTimeoutMs = metricsServletTimeoutMs;
+        this.cluster = cluster;
     }
 
     @Override
@@ -66,19 +58,16 @@ public class PrometheusMetricsServlet extends HttpServlet {
     }
 
     @Override
-    protected void doGet(HttpServletRequest request, HttpServletResponse response)
-            throws ServletException, IOException {
+    protected void doGet(HttpServletRequest request, HttpServletResponse response) {
         AsyncContext context = request.startAsync();
         context.setTimeout(metricsServletTimeoutMs);
         executor.execute(safeRun(() -> {
             long start = System.currentTimeMillis();
             HttpServletResponse res = (HttpServletResponse) context.getResponse();
             try {
-                res.setStatus(HttpStatus.OK_200);
+                res.setStatus(HTTP_STATUS_OK_200);
                 res.setContentType("text/plain");
-                PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
-                        shouldExportProducerMetrics, splitTopicAndPartitionLabel, res.getOutputStream(),
-                        metricsProviders);
+                generateMetrics(cluster, res.getOutputStream());
             } catch (Exception e) {
                 long end = System.currentTimeMillis();
                 long time = end - start;
@@ -90,7 +79,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
                 } else {
                     log.error("Failed to generate prometheus stats, {} ms elapsed", time, e);
                 }
-                res.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
+                res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
             } finally {
                 long end = System.currentTimeMillis();
                 long time = end - start;
@@ -106,6 +95,10 @@ public class PrometheusMetricsServlet extends HttpServlet {
         }));
     }
 
+    protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException {
+        PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, metricsProviders);
+    }
+
     @Override
     public void destroy() {
         if (executor != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
similarity index 100%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
copy to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java
similarity index 69%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
rename to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java
index d2067766a41..3723fb4ff5c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java
@@ -17,17 +17,3 @@
  * under the License.
  */
 package org.apache.pulsar.broker.stats.prometheus;
-
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
-
-/**
- * The prometheus metrics provider for generate prometheus format metrics.
- */
-public interface PrometheusRawMetricsProvider {
-
-    /**
-     * Generate the metrics from the metrics provider.
-     * @param stream the stream that write the metrics to
-     */
-    void generate(SimpleTextOutputStream stream);
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e7eb8a09986..08fff1eea36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -106,8 +106,8 @@ import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
@@ -245,7 +245,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
     // packages management service
     private Optional<PackagesManagement> packagesManagement = Optional.empty();
-    private PrometheusMetricsServlet metricsServlet;
+    private PulsarPrometheusMetricsServlet metricsServlet;
     private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
 
     private MetadataStoreExtended localMetadataStore;
@@ -414,7 +414,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 }
             }
 
-            metricsServlet = null;
+            resetMetricsServlet();
 
             if (this.webSocketService != null) {
                 this.webSocketService.close();
@@ -571,6 +571,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         }
     }
 
+    private synchronized void resetMetricsServlet() {
+        metricsServlet = null;
+    }
+
     private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
         ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
                 new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
@@ -698,16 +702,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
             this.brokerAdditionalServlets = AdditionalServlets.load(config);
 
             this.webService = new WebService(this);
-            this.metricsServlet = new PrometheusMetricsServlet(
-                    this, config.isExposeTopicLevelMetricsInPrometheus(),
-                    config.isExposeConsumerLevelMetricsInPrometheus(),
-                    config.isExposeProducerLevelMetricsInPrometheus(),
-                    config.isSplitTopicAndPartitionLabelInPrometheus());
-            if (pendingMetricsProviders != null) {
-                pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
-                this.pendingMetricsProviders = null;
-            }
-
+            createMetricsServlet();
             this.addWebServerHandlers(webService, metricsServlet, this.config);
             this.webService.start();
             heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config);
@@ -827,8 +822,20 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         }
     }
 
+    private synchronized void createMetricsServlet() {
+        this.metricsServlet = new PulsarPrometheusMetricsServlet(
+                this, config.isExposeTopicLevelMetricsInPrometheus(),
+                config.isExposeConsumerLevelMetricsInPrometheus(),
+                config.isExposeProducerLevelMetricsInPrometheus(),
+                config.isSplitTopicAndPartitionLabelInPrometheus());
+        if (pendingMetricsProviders != null) {
+            pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
+            this.pendingMetricsProviders = null;
+        }
+    }
+
     private void addWebServerHandlers(WebService webService,
-                                      PrometheusMetricsServlet metricsServlet,
+                                      PulsarPrometheusMetricsServlet metricsServlet,
                                       ServiceConfiguration config)
             throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
             DeploymentException {
@@ -1525,7 +1532,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         return resourceUsageTransportManager;
     }
 
-    public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
+    public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
         if (metricsServlet == null) {
             if (pendingMetricsProviders == null) {
                 pendingMetricsProviders = new LinkedList<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 9d5e1c77c69..a585601d545 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -18,12 +18,12 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.generateSystemMetrics;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.getTypeStr;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.prometheus.client.Collector;
-import io.prometheus.client.Collector.MetricFamilySamples;
-import io.prometheus.client.Collector.MetricFamilySamples.Sample;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
@@ -34,7 +34,6 @@ import java.io.StringWriter;
 import java.io.Writer;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -230,53 +229,4 @@ public class PrometheusMetricsGenerator {
         }
     }
 
-    private static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) {
-        Enumeration<MetricFamilySamples> metricFamilySamples = CollectorRegistry.defaultRegistry.metricFamilySamples();
-        while (metricFamilySamples.hasMoreElements()) {
-            MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
-
-            // Write type of metric
-            stream.write("# TYPE ").write(metricFamily.name).write(' ')
-                    .write(getTypeStr(metricFamily.type)).write('\n');
-
-            for (int i = 0; i < metricFamily.samples.size(); i++) {
-                Sample sample = metricFamily.samples.get(i);
-                stream.write(sample.name);
-                stream.write("{cluster=\"").write(cluster).write('"');
-                for (int j = 0; j < sample.labelNames.size(); j++) {
-                    String labelValue = sample.labelValues.get(j);
-                    if (labelValue != null) {
-                        labelValue = labelValue.replace("\"", "\\\"");
-                    }
-
-                    stream.write(",");
-                    stream.write(sample.labelNames.get(j));
-                    stream.write("=\"");
-                    stream.write(labelValue);
-                    stream.write('"');
-                }
-
-                stream.write("} ");
-                stream.write(Collector.doubleToGoString(sample.value));
-                stream.write('\n');
-            }
-        }
-    }
-
-    static String getTypeStr(Collector.Type type) {
-        switch (type) {
-        case COUNTER:
-            return "counter";
-        case GAUGE:
-            return "gauge";
-        case SUMMARY        :
-            return "summary";
-        case HISTOGRAM:
-            return "histogram";
-        case UNTYPED:
-        default:
-            return "untyped";
-        }
-    }
-
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
new file mode 100644
index 00000000000..3d64f22d630
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import java.io.IOException;
+import javax.servlet.ServletOutputStream;
+import org.apache.pulsar.broker.PulsarService;
+
+public class PulsarPrometheusMetricsServlet extends PrometheusMetricsServlet {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PulsarService pulsar;
+    private final boolean shouldExportTopicMetrics;
+    private final boolean shouldExportConsumerMetrics;
+    private final boolean shouldExportProducerMetrics;
+    private final boolean splitTopicAndPartitionLabel;
+
+    public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics,
+                                          boolean includeConsumerMetrics, boolean shouldExportProducerMetrics,
+                                          boolean splitTopicAndPartitionLabel) {
+        super(pulsar.getConfiguration().getMetricsServletTimeoutMs(), pulsar.getConfiguration().getClusterName());
+        this.pulsar = pulsar;
+        this.shouldExportTopicMetrics = includeTopicMetrics;
+        this.shouldExportConsumerMetrics = includeConsumerMetrics;
+        this.shouldExportProducerMetrics = shouldExportProducerMetrics;
+        this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
+    }
+
+    @Override
+    protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException {
+        PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
+                shouldExportProducerMetrics, splitTopicAndPartitionLabel, outputStream,
+                metricsProviders);
+    }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 8bb1ff752f1..6a830657423 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -38,6 +38,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -54,6 +55,8 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -134,6 +137,9 @@ public class ProxyService implements Closeable {
     @Getter
     private AdditionalServlets proxyAdditionalServlets;
 
+    private PrometheusMetricsServlet metricsServlet;
+    private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
+
     public ProxyService(ProxyConfiguration proxyConfig,
                         AuthenticationService authenticationService) throws Exception {
         requireNonNull(proxyConfig);
@@ -250,6 +256,8 @@ public class ProxyService implements Closeable {
             this.serviceUrlTls = null;
         }
 
+        createMetricsServlet();
+
         // Initialize the message protocol handlers.
         // start the protocol handlers only after the broker is ready,
         // so that the protocol handlers can access broker service properly.
@@ -259,6 +267,14 @@ public class ProxyService implements Closeable {
         startProxyExtensions(protocolHandlerChannelInitializers, bootstrap);
     }
 
+    private synchronized void createMetricsServlet() {
+        this.metricsServlet = new PrometheusMetricsServlet(-1L, proxyConfig.getClusterName());
+        if (pendingMetricsProviders != null) {
+            pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
+            this.pendingMetricsProviders = null;
+        }
+    }
+
     // This call is used for starting additional protocol handlers
     public void startProxyExtensions(Map<String, Map<InetSocketAddress,
             ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) {
@@ -335,6 +351,8 @@ public class ProxyService implements Closeable {
             proxyAdditionalServlets = null;
         }
 
+        resetMetricsServlet();
+
         if (localMetadataStore != null) {
             try {
                 localMetadataStore.close();
@@ -356,6 +374,10 @@ public class ProxyService implements Closeable {
         }
     }
 
+    private synchronized void resetMetricsServlet() {
+        metricsServlet = null;
+    }
+
     public String getServiceUrl() {
         return serviceUrl;
     }
@@ -414,5 +436,20 @@ public class ProxyService implements Closeable {
         return this.proxyClientAuthentication;
     }
 
+    public synchronized PrometheusMetricsServlet getMetricsServlet() {
+        return metricsServlet;
+    }
+
+    public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
+        if (metricsServlet == null) {
+            if (pendingMetricsProviders == null) {
+                pendingMetricsProviders = new LinkedList<>();
+            }
+            pendingMetricsProviders.add(metricsProvider);
+        } else {
+            this.metricsServlet.addRawMetricsProvider(metricsProvider);
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 32b6fa13314..4df36229b01 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -32,7 +32,6 @@ import io.netty.util.internal.PlatformDependent;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
-import io.prometheus.client.exporter.MetricsServlet;
 import io.prometheus.client.hotspot.DefaultExports;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -44,6 +43,7 @@ import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
@@ -240,8 +240,13 @@ public class ProxyServiceStarter {
                                      ProxyConfiguration config,
                                      ProxyService service,
                                      BrokerDiscoveryProvider discoveryProvider) throws Exception {
-        server.addServlet("/metrics", new ServletHolder(MetricsServlet.class),
-                Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
+        if (service != null) {
+            PrometheusMetricsServlet metricsServlet = service.getMetricsServlet();
+            if (metricsServlet != null) {
+                server.addServlet("/metrics", new ServletHolder(metricsServlet),
+                        Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
+            }
+        }
         server.addRestResources("/", VipStatus.class.getPackage().getName(),
                 VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
         server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(),
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
new file mode 100644
index 00000000000..63ac43d3210
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import io.prometheus.client.Counter;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest {
+
+    public static final String TEST_CLUSTER = "test-cluster";
+    private ProxyService proxyService;
+    private WebServer proxyWebServer;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
+        proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(TEST_CLUSTER);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+
+        proxyService.start();
+
+        proxyService.addPrometheusRawMetricsProvider(stream -> stream.write("test_metrics{label1=\"xyz\"} 10 \n"));
+
+        AuthenticationService authService = new AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig));
+
+        proxyWebServer = new WebServer(proxyConfig, authService);
+        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null);
+        proxyWebServer.start();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        proxyWebServer.stop();
+        proxyService.close();
+    }
+
+    /**
+     * Validates proxy Prometheus endpoint.
+     */
+    @Test
+    public void testMetrics() {
+        Counter.build("test_counter", "a test counter").create().register();
+
+        Client httpClient = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
+        Response r = httpClient.target(proxyWebServer.getServiceUri()).path("/metrics").request()
+                .get();
+        Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode());
+        String response = r.readEntity(String.class).trim();
+
+        Multimap<String, Metric> metrics = parseMetrics(response);
+
+        // Check that ProxyService metrics are present
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_proxy_binary_bytes");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), TEST_CLUSTER);
+
+        // Check that any Prometheus metric registered in the default CollectorRegistry is present
+        List<Metric> cm2 = (List<Metric>) metrics.get("test_metrics");
+        assertEquals(cm2.size(), 1);
+        assertEquals(cm2.get(0).tags.get("label1"), "xyz");
+
+        // Check that PrometheusRawMetricsProvider metrics are present
+        List<Metric> cm3 = (List<Metric>) metrics.get("test_counter");
+        assertEquals(cm3.size(), 1);
+        assertEquals(cm3.get(0).tags.get("cluster"), TEST_CLUSTER);
+    }
+
+    /**
+     * Hacky parsing of Prometheus text format. Should be good enough for unit tests
+     */
+    public static Multimap<String, Metric> parseMetrics(String metrics) {
+        Multimap<String, Metric> parsed = ArrayListMultimap.create();
+
+        // Example of lines are
+        // jvm_threads_current{cluster="standalone",} 203.0
+        // or
+        // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
+        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+        Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$");
+        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+
+        Splitter.on("\n").split(metrics).forEach(line -> {
+            if (line.isEmpty() || line.startsWith("#")) {
+                return;
+            }
+
+            Matcher matcher = pattern.matcher(line);
+            assertTrue(matcher.matches(), "line " + line + " does not match pattern " + pattern);
+            String name = matcher.group(1);
+
+            Metric m = new Metric();
+            String numericValue = matcher.group(3);
+            if (numericValue.equalsIgnoreCase("-Inf")) {
+                m.value = Double.NEGATIVE_INFINITY;
+            } else if (numericValue.equalsIgnoreCase("+Inf")) {
+                m.value = Double.POSITIVE_INFINITY;
+            } else {
+                m.value = Double.parseDouble(numericValue);
+            }
+            String tags = matcher.group(2);
+            Matcher tagsMatcher = tagsPattern.matcher(tags);
+            while (tagsMatcher.find()) {
+                String tag = tagsMatcher.group(1);
+                String value = tagsMatcher.group(2);
+                m.tags.put(tag, value);
+            }
+
+            parsed.put(name, m);
+        });
+
+        return parsed;
+    }
+
+    static class Metric {
+        Map<String, String> tags = new TreeMap<>();
+        double value;
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this).add("tags", tags).add("value", value).toString();
+        }
+    }
+
+}


[pulsar] 02/02: PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)

Posted by eo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4af6b2620b09433505728ef696c2bbaf684858c7
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Apr 27 09:47:54 2022 +0200

    PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)
    
    Sometimes the CommandSubscribe object has already been released and it triggers this error:
    17:36:40.676 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] WARN  org.apache.pulsar.broker.service.ServerCnx - [/192.168.1.111:50688][persistent://public/default/test-cb4105f6-f850-4bdf-9e79-66d4ac42658c][13b9ee68-4ee4-4845-b955-77420b8b6a29] Failed to create consumer: consumerId=0, refCnt: 0
    java.util.concurrent.CompletionException: io.netty.util.IllegalReferenceCountException: refCnt: 0
            at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?]
            at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1081) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
            at org.apache.pulsar.broker.service.BrokerService.lambda(BrokerService.java:1419) ~[pulsar-broker-2.10.0.jar:2.10.0]
            at java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:815) ~[?:?]
            at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:799) ~[?:?]
            at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2121) ~[?:?]
            at org.apache.pulsar.broker.service.BrokerService.openLedgerComplete(BrokerService.java:1405) ~[pulsar-broker-2.10.0.jar:2.10.0]
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda(ManagedLedgerFactoryImpl.java:425) ~[managed-ledger-2.10.0.jar:2.10.0]
            at java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) ~[?:?]
            at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.initializeComplete(ManagedLedgerFactoryImpl.java:392) ~[managed-ledger-2.10.0.jar:2.10.0]
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:525) ~[managed-ledger-2.10.0.jar:2.10.0]
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:515) ~[managed-ledger-2.10.0.jar:2.10.0]
            at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda(MetaStoreImpl.java:167) ~[managed-ledger-2.10.0.jar:2.10.0]
            at java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) [?:?]
            at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) [?:?]
            at java.util.concurrent.CompletableFuture.run(CompletableFuture.java:478) [?:?]
            at org.apache.bookkeeper.common.util.OrderedExecutor.run(OrderedExecutor.java:203) [bookkeeper-common-4.14.4.jar:4.14.4]
            at java.util.concurrent.Executors.call(Executors.java:515) [?:?]
            at java.util.concurrent.FutureTask.run2168(FutureTask.java:264) [?:?]
            at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
            at java.util.concurrent.ScheduledThreadPoolExecutor.run(ScheduledThreadPoolExecutor.java:304) [?:?]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
            at java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:628) [?:?]
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.74.Final.jar:4.1.74.Final]
            at java.lang.Thread.run(Thread.java:829) [?:?]
    Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
            at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.UnsafeByteBufUtil.getBytes(UnsafeByteBufUtil.java:481) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.PooledUnsafeDirectByteBuf.getBytes(PooledUnsafeDirectByteBuf.java:130) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.PooledSlicedByteBuf.getBytes(PooledSlicedByteBuf.java:235) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250) ~[pulsar-common-2.10.0.jar:2.10.0]
            at org.apache.pulsar.common.api.proto.KeyValue.getKey(KeyValue.java:19) ~[pulsar-common-2.10.0.jar:2.10.0]
            at java.util.stream.Collectors.lambda(Collectors.java:1658) ~[?:?]
            at java.util.stream.ReduceOpsReducingSink.accept(ReduceOps.java:169) ~[?:?]
            at java.util.ArrayList.forEachRemaining(ArrayList.java:1511) ~[?:?]
            at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
            at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
            at java.util.stream.ReduceOps.evaluateSequential(ReduceOps.java:913) ~[?:?]
            at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
            at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
            at org.apache.pulsar.broker.service.SubscriptionOption.getPropertiesMap(SubscriptionOption.java:57) ~[pulsar-broker-2.10.0.jar:2.10.0]
            at org.apache.pulsar.broker.service.ServerCnx.lambda(ServerCnx.java:1047) ~[pulsar-broker-2.10.0.jar:2.10.0]
            at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1072) ~[?:?]
            ... 28 more
    
    (cherry picked from commit e78d9f1ac546c150f4068c148e5ffe95c2ddf1f9)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index dd78eec80b7..46691273e31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1016,6 +1016,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 boolean createTopicIfDoesNotExist = forceTopicCreation
                         && service.isAllowAutoTopicCreation(topicName.toString());
 
+                final long consumerEpoch;
+                if (subscribe.hasConsumerEpoch()) {
+                    consumerEpoch = subscribe.getConsumerEpoch();
+                } else {
+                    consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+                }
+                Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
+                        subscribe.getSubscriptionPropertiesList());
                 service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
                         .thenCompose(optTopic -> {
                             if (!optTopic.isPresent()) {
@@ -1037,10 +1045,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                                 new SubscriptionNotFoundException(
                                                         "Subscription does not exist"));
                             }
-                            long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
-                            if (subscribe.hasConsumerEpoch()) {
-                                consumerEpoch = subscribe.getConsumerEpoch();
-                            }
+
                             SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
                                     .subscriptionName(subscriptionName)
                                     .consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
@@ -1049,8 +1054,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                     .initialPosition(initialPosition)
                                     .startMessageRollbackDurationSec(startMessageRollbackDurationSec)
                                     .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
-                                    .subscriptionProperties(SubscriptionOption.getPropertiesMap(
-                                            subscribe.getSubscriptionPropertiesList()))
+                                    .subscriptionProperties(subscriptionProperties)
                                     .consumerEpoch(consumerEpoch)
                                     .build();
                             if (schema != null) {