You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/19 00:20:45 UTC

[GitHub] merlimat closed pull request #2805: Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present

merlimat closed pull request #2805:  Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present
URL: https://github.com/apache/pulsar/pull/2805
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 8a727f2d39..e3a009129f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -48,6 +48,8 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
         TopicStats topicStats = localTopicStats.get();
 
+        printDefaultBrokerStats(stream, cluster);
+
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
 
@@ -151,6 +153,23 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
         });
     }
 
+    private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
+        // Print metrics with 0 values. This is necessary to have the available brokers being
+        // reported in the brokers dashboard even if they don't have any topic or traffi
+        metric(stream, cluster, "pulsar_topics_count", 0);
+        metric(stream, cluster, "pulsar_subscriptions_count", 0);
+        metric(stream, cluster, "pulsar_producers_count", 0);
+        metric(stream, cluster, "pulsar_consumers_count", 0);
+        metric(stream, cluster, "pulsar_rate_in", 0);
+        metric(stream, cluster, "pulsar_rate_out", 0);
+        metric(stream, cluster, "pulsar_throughput_in", 0);
+        metric(stream, cluster, "pulsar_throughput_out", 0);
+        metric(stream, cluster, "pulsar_storage_size", 0);
+        metric(stream, cluster, "pulsar_storage_write_rate", 0);
+        metric(stream, cluster, "pulsar_storage_read_rate", 0);
+        metric(stream, cluster, "pulsar_msg_backlog", 0);
+    }
+
     private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
                                             AggregatedNamespaceStats stats) {
         metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
@@ -216,6 +235,15 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
         }
     }
 
+    private static void metric(SimpleTextOutputStream stream, String cluster, String name,
+            long value) {
+        TopicStats.metricType(stream, name);
+        stream.write(name)
+                .write("{cluster=\"").write(cluster).write("\"} ")
+                .write(value).write(' ').write(System.currentTimeMillis())
+                .write('\n');
+    }
+
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
                                long value) {
         TopicStats.metricType(stream, name);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 91bc0b826b..90a4bd881d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -82,13 +82,13 @@ public void testPerTopicStats() throws Exception {
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("pulsar_producers_count");
-        assertEquals(cm.size(), 2);
-        assertEquals(cm.get(0).value, 1.0);
-        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
-        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.size(), 3);
         assertEquals(cm.get(1).value, 1.0);
-        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(2).value, 1.0);
+        assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("topic_load_times_count");
         assertEquals(cm.size(), 1);
@@ -126,10 +126,10 @@ public void testPerNamespaceStats() throws Exception {
         assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("pulsar_producers_count");
-        assertEquals(cm.size(), 1);
-        assertEquals(cm.get(0).value, 2.0);
-        assertEquals(cm.get(0).tags.get("topic"), null);
-        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(1).value, 2.0);
+        assertEquals(cm.get(1).tags.get("topic"), null);
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
         p1.close();
         p2.close();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index e6f0b5fcd0..232db4307e 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -63,12 +63,6 @@
 
     private final Authentication authentication;
 
-    static final Counter opsCounter = Counter
-            .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();
-
-    static final Counter bytesCounter = Counter
-            .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();
-
     public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -176,9 +170,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
                 break;
 
             case HandshakeCompleted:
-                opsCounter.inc();
+                ProxyService.opsCounter.inc();
                 if (msg instanceof ByteBuf) {
-                    bytesCounter.inc(((ByteBuf) msg).readableBytes());
+                    ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
                 }
                 inboundChannel.writeAndFlush(msg).addListener(this);
                 break;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 42e4894ca2..2ebea8a774 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -92,17 +92,7 @@ ConnectionPool getConnectionPool() {
         return client.getCnxPool();
     }
 
-    private static final Gauge activeConnections = Gauge
-            .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
-            .register();
 
-    private static final Counter newConnections = Counter
-            .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
-            .register();
-
-    static final Counter rejectedConnections = Counter
-            .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
-            .register();
 
     public ProxyConnection(ProxyService proxyService) {
         super(30, TimeUnit.SECONDS);
@@ -113,10 +103,10 @@ public ProxyConnection(ProxyService proxyService) {
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         super.channelRegistered(ctx);
-        activeConnections.inc();
-        if (activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+        ProxyService.activeConnections.inc();
+        if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
             ctx.close();
-            rejectedConnections.inc();
+            ProxyService.rejectedConnections.inc();
             return;
         }
     }
@@ -124,13 +114,13 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
     @Override
     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
         super.channelUnregistered(ctx);
-        activeConnections.dec();
+        ProxyService.activeConnections.dec();
     }
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-        newConnections.inc();
+        ProxyService.newConnections.inc();
         LOG.info("[{}] New connection opened", remoteAddress);
     }
 
@@ -169,9 +159,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
         case ProxyConnectionToBroker:
             // Pass the buffer to the outbound connection and schedule next read
             // only if we can write on the connection
-            DirectProxyHandler.opsCounter.inc();
+            ProxyService.opsCounter.inc();
             if (msg instanceof ByteBuf) {
-                DirectProxyHandler.bytesCounter.inc(((ByteBuf) msg).readableBytes());
+                ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
             }
             directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
             break;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 964cd1e8d9..eaf5f04fe7 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -21,15 +21,22 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -40,13 +47,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Pulsar proxy service
  */
@@ -72,6 +72,24 @@
 
     private static final int numThreads = Runtime.getRuntime().availableProcessors();
 
+    static final Gauge activeConnections = Gauge
+            .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
+            .register();
+
+    static final Counter newConnections = Counter
+            .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
+            .register();
+
+    static final Counter rejectedConnections = Counter
+            .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
+            .register();
+
+    static final Counter opsCounter = Counter
+            .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();
+
+    static final Counter bytesCounter = Counter
+            .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();
+
     public ProxyService(ProxyConfiguration proxyConfig,
                         AuthenticationService authenticationService) throws IOException {
         checkNotNull(proxyConfig);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 3e28242092..20ebeb1187 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -78,7 +78,7 @@ public void testInboundConnection() throws Exception {
         PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
                 .build();
         Producer<byte[]> producer2;
-        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
+        Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d);
         try {
             producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
             producer2.send("Message 1".getBytes());
@@ -86,7 +86,7 @@ public void testInboundConnection() throws Exception {
         } catch (Exception ex) {
             // OK
         }
-        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d);
+        Assert.assertEquals(ProxyService.rejectedConnections.get(), 1.0d);
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services