You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/19 00:20:50 UTC
[pulsar] branch master updated: Ensure proxy and broker stats are
reported to Prometheus (with 0) even when no traffic is present (#2805)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3d51af1 Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present (#2805)
3d51af1 is described below
commit 3d51af1f2ab72f7e877a098c2b602b760a3c185d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 18 17:20:43 2018 -0700
Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present (#2805)
* Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present
* Fixed unit test
---
.../stats/prometheus/NamespaceStatsAggregator.java | 28 +++++++++++++++++
.../pulsar/broker/stats/PrometheusMetricsTest.java | 18 +++++------
.../pulsar/proxy/server/DirectProxyHandler.java | 10 ++----
.../pulsar/proxy/server/ProxyConnection.java | 24 +++++----------
.../apache/pulsar/proxy/server/ProxyService.java | 36 ++++++++++++++++------
.../server/ProxyConnectionThrottlingTest.java | 4 +--
6 files changed, 75 insertions(+), 45 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 8a727f2..e3a0091 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -48,6 +48,8 @@ public class NamespaceStatsAggregator {
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats topicStats = localTopicStats.get();
+ printDefaultBrokerStats(stream, cluster);
+
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
@@ -151,6 +153,23 @@ public class NamespaceStatsAggregator {
});
}
+ private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
+ // Print metrics with 0 values. This is necessary to have the available brokers being
+ // reported in the brokers dashboard even if they don't have any topic or traffi
+ metric(stream, cluster, "pulsar_topics_count", 0);
+ metric(stream, cluster, "pulsar_subscriptions_count", 0);
+ metric(stream, cluster, "pulsar_producers_count", 0);
+ metric(stream, cluster, "pulsar_consumers_count", 0);
+ metric(stream, cluster, "pulsar_rate_in", 0);
+ metric(stream, cluster, "pulsar_rate_out", 0);
+ metric(stream, cluster, "pulsar_throughput_in", 0);
+ metric(stream, cluster, "pulsar_throughput_out", 0);
+ metric(stream, cluster, "pulsar_storage_size", 0);
+ metric(stream, cluster, "pulsar_storage_write_rate", 0);
+ metric(stream, cluster, "pulsar_storage_read_rate", 0);
+ metric(stream, cluster, "pulsar_msg_backlog", 0);
+ }
+
private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
AggregatedNamespaceStats stats) {
metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
@@ -216,6 +235,15 @@ public class NamespaceStatsAggregator {
}
}
+ private static void metric(SimpleTextOutputStream stream, String cluster, String name,
+ long value) {
+ TopicStats.metricType(stream, name);
+ stream.write(name)
+ .write("{cluster=\"").write(cluster).write("\"} ")
+ .write(value).write(' ').write(System.currentTimeMillis())
+ .write('\n');
+ }
+
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
long value) {
TopicStats.metricType(stream, name);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 91bc0b8..90a4bd8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -82,13 +82,13 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_producers_count");
- assertEquals(cm.size(), 2);
- assertEquals(cm.get(0).value, 1.0);
- assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
- assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.size(), 3);
assertEquals(cm.get(1).value, 1.0);
- assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(2).value, 1.0);
+ assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("topic_load_times_count");
assertEquals(cm.size(), 1);
@@ -126,10 +126,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_producers_count");
- assertEquals(cm.size(), 1);
- assertEquals(cm.get(0).value, 2.0);
- assertEquals(cm.get(0).tags.get("topic"), null);
- assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(1).value, 2.0);
+ assertEquals(cm.get(1).tags.get("topic"), null);
+ assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
p1.close();
p2.close();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index e6f0b5f..232db43 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -63,12 +63,6 @@ public class DirectProxyHandler {
private final Authentication authentication;
- static final Counter opsCounter = Counter
- .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();
-
- static final Counter bytesCounter = Counter
- .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();
-
public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
@@ -176,9 +170,9 @@ public class DirectProxyHandler {
break;
case HandshakeCompleted:
- opsCounter.inc();
+ ProxyService.opsCounter.inc();
if (msg instanceof ByteBuf) {
- bytesCounter.inc(((ByteBuf) msg).readableBytes());
+ ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
}
inboundChannel.writeAndFlush(msg).addListener(this);
break;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 42e4894..2ebea8a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -92,17 +92,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
return client.getCnxPool();
}
- private static final Gauge activeConnections = Gauge
- .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
- .register();
- private static final Counter newConnections = Counter
- .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
- .register();
-
- static final Counter rejectedConnections = Counter
- .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
- .register();
public ProxyConnection(ProxyService proxyService) {
super(30, TimeUnit.SECONDS);
@@ -113,10 +103,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
- activeConnections.inc();
- if (activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+ ProxyService.activeConnections.inc();
+ if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ctx.close();
- rejectedConnections.inc();
+ ProxyService.rejectedConnections.inc();
return;
}
}
@@ -124,13 +114,13 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
- activeConnections.dec();
+ ProxyService.activeConnections.dec();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
- newConnections.inc();
+ ProxyService.newConnections.inc();
LOG.info("[{}] New connection opened", remoteAddress);
}
@@ -169,9 +159,9 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
case ProxyConnectionToBroker:
// Pass the buffer to the outbound connection and schedule next read
// only if we can write on the connection
- DirectProxyHandler.opsCounter.inc();
+ ProxyService.opsCounter.inc();
if (msg instanceof ByteBuf) {
- DirectProxyHandler.bytesCounter.inc(((ByteBuf) msg).readableBytes());
+ ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
}
directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
break;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 964cd1e..eaf5f04 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -21,15 +21,22 @@ package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -40,13 +47,6 @@ import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Pulsar proxy service
*/
@@ -72,6 +72,24 @@ public class ProxyService implements Closeable {
private static final int numThreads = Runtime.getRuntime().availableProcessors();
+ static final Gauge activeConnections = Gauge
+ .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
+ .register();
+
+ static final Counter newConnections = Counter
+ .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
+ .register();
+
+ static final Counter rejectedConnections = Counter
+ .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
+ .register();
+
+ static final Counter opsCounter = Counter
+ .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();
+
+ static final Counter bytesCounter = Counter
+ .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();
+
public ProxyService(ProxyConfiguration proxyConfig,
AuthenticationService authenticationService) throws IOException {
checkNotNull(proxyConfig);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 3e28242..20ebeb1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -78,7 +78,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
.build();
Producer<byte[]> producer2;
- Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
+ Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d);
try {
producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
producer2.send("Message 1".getBytes());
@@ -86,7 +86,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
} catch (Exception ex) {
// OK
}
- Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d);
+ Assert.assertEquals(ProxyService.rejectedConnections.get(), 1.0d);
}
private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);