You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/19 00:20:50 UTC

[pulsar] branch master updated: Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present (#2805)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d51af1   Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present (#2805)
3d51af1 is described below

commit 3d51af1f2ab72f7e877a098c2b602b760a3c185d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 18 17:20:43 2018 -0700

     Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present (#2805)
    
    * Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present
    
    * Fixed unit test
---
 .../stats/prometheus/NamespaceStatsAggregator.java | 28 +++++++++++++++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 18 +++++------
 .../pulsar/proxy/server/DirectProxyHandler.java    | 10 ++----
 .../pulsar/proxy/server/ProxyConnection.java       | 24 +++++----------
 .../apache/pulsar/proxy/server/ProxyService.java   | 36 ++++++++++++++++------
 .../server/ProxyConnectionThrottlingTest.java      |  4 +--
 6 files changed, 75 insertions(+), 45 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 8a727f2..e3a0091 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -48,6 +48,8 @@ public class NamespaceStatsAggregator {
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
         TopicStats topicStats = localTopicStats.get();
 
+        printDefaultBrokerStats(stream, cluster);
+
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
 
@@ -151,6 +153,23 @@ public class NamespaceStatsAggregator {
         });
     }
 
+    private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
+        // Print metrics with 0 values. This is necessary to have the available brokers being
+        // reported in the brokers dashboard even if they don't have any topic or traffi
+        metric(stream, cluster, "pulsar_topics_count", 0);
+        metric(stream, cluster, "pulsar_subscriptions_count", 0);
+        metric(stream, cluster, "pulsar_producers_count", 0);
+        metric(stream, cluster, "pulsar_consumers_count", 0);
+        metric(stream, cluster, "pulsar_rate_in", 0);
+        metric(stream, cluster, "pulsar_rate_out", 0);
+        metric(stream, cluster, "pulsar_throughput_in", 0);
+        metric(stream, cluster, "pulsar_throughput_out", 0);
+        metric(stream, cluster, "pulsar_storage_size", 0);
+        metric(stream, cluster, "pulsar_storage_write_rate", 0);
+        metric(stream, cluster, "pulsar_storage_read_rate", 0);
+        metric(stream, cluster, "pulsar_msg_backlog", 0);
+    }
+
     private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
                                             AggregatedNamespaceStats stats) {
         metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
@@ -216,6 +235,15 @@ public class NamespaceStatsAggregator {
         }
     }
 
+    private static void metric(SimpleTextOutputStream stream, String cluster, String name,
+            long value) {
+        TopicStats.metricType(stream, name);
+        stream.write(name)
+                .write("{cluster=\"").write(cluster).write("\"} ")
+                .write(value).write(' ').write(System.currentTimeMillis())
+                .write('\n');
+    }
+
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
                                long value) {
         TopicStats.metricType(stream, name);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 91bc0b8..90a4bd8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -82,13 +82,13 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("pulsar_producers_count");
-        assertEquals(cm.size(), 2);
-        assertEquals(cm.get(0).value, 1.0);
-        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
-        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.size(), 3);
         assertEquals(cm.get(1).value, 1.0);
-        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(2).value, 1.0);
+        assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("topic_load_times_count");
         assertEquals(cm.size(), 1);
@@ -126,10 +126,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("pulsar_producers_count");
-        assertEquals(cm.size(), 1);
-        assertEquals(cm.get(0).value, 2.0);
-        assertEquals(cm.get(0).tags.get("topic"), null);
-        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(1).value, 2.0);
+        assertEquals(cm.get(1).tags.get("topic"), null);
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
         p1.close();
         p2.close();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index e6f0b5f..232db43 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -63,12 +63,6 @@ public class DirectProxyHandler {
 
     private final Authentication authentication;
 
-    static final Counter opsCounter = Counter
-            .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();
-
-    static final Counter bytesCounter = Counter
-            .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();
-
     public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -176,9 +170,9 @@ public class DirectProxyHandler {
                 break;
 
             case HandshakeCompleted:
-                opsCounter.inc();
+                ProxyService.opsCounter.inc();
                 if (msg instanceof ByteBuf) {
-                    bytesCounter.inc(((ByteBuf) msg).readableBytes());
+                    ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
                 }
                 inboundChannel.writeAndFlush(msg).addListener(this);
                 break;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 42e4894..2ebea8a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -92,17 +92,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         return client.getCnxPool();
     }
 
-    private static final Gauge activeConnections = Gauge
-            .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
-            .register();
 
-    private static final Counter newConnections = Counter
-            .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
-            .register();
-
-    static final Counter rejectedConnections = Counter
-            .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
-            .register();
 
     public ProxyConnection(ProxyService proxyService) {
         super(30, TimeUnit.SECONDS);
@@ -113,10 +103,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         super.channelRegistered(ctx);
-        activeConnections.inc();
-        if (activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+        ProxyService.activeConnections.inc();
+        if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
             ctx.close();
-            rejectedConnections.inc();
+            ProxyService.rejectedConnections.inc();
             return;
         }
     }
@@ -124,13 +114,13 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     @Override
     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
         super.channelUnregistered(ctx);
-        activeConnections.dec();
+        ProxyService.activeConnections.dec();
     }
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-        newConnections.inc();
+        ProxyService.newConnections.inc();
         LOG.info("[{}] New connection opened", remoteAddress);
     }
 
@@ -169,9 +159,9 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         case ProxyConnectionToBroker:
             // Pass the buffer to the outbound connection and schedule next read
             // only if we can write on the connection
-            DirectProxyHandler.opsCounter.inc();
+            ProxyService.opsCounter.inc();
             if (msg instanceof ByteBuf) {
-                DirectProxyHandler.bytesCounter.inc(((ByteBuf) msg).readableBytes());
+                ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
             }
             directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
             break;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 964cd1e..eaf5f04 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -21,15 +21,22 @@ package org.apache.pulsar.proxy.server;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -40,13 +47,6 @@ import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Pulsar proxy service
  */
@@ -72,6 +72,24 @@ public class ProxyService implements Closeable {
 
     private static final int numThreads = Runtime.getRuntime().availableProcessors();
 
+    static final Gauge activeConnections = Gauge
+            .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
+            .register();
+
+    static final Counter newConnections = Counter
+            .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
+            .register();
+
+    static final Counter rejectedConnections = Counter
+            .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
+            .register();
+
+    static final Counter opsCounter = Counter
+            .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();
+
+    static final Counter bytesCounter = Counter
+            .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();
+
     public ProxyService(ProxyConfiguration proxyConfig,
                         AuthenticationService authenticationService) throws IOException {
         checkNotNull(proxyConfig);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 3e28242..20ebeb1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -78,7 +78,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
         PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
                 .build();
         Producer<byte[]> producer2;
-        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
+        Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d);
         try {
             producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
             producer2.send("Message 1".getBytes());
@@ -86,7 +86,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
         } catch (Exception ex) {
             // OK
         }
-        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d);
+        Assert.assertEquals(ProxyService.rejectedConnections.get(), 1.0d);
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);