You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/02 14:15:17 UTC

[GitHub] [pulsar] KannarFr opened a new pull request #7433: [WIP] Enable pulsar component to send their metrics to topics

KannarFr opened a new pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433


   Fixes #7419
   
   ### Motivation
   
   Enable pulsar components to send their metrics to defined topics in a dedicated tenant.
   
   ### Modifications
   
   Create a MetricsSender interface and fill it with PulsarMetricsSender as the "to pulsar" sender.
   
   ### Verifying this change
   
   This change added tests and can be verified as follows:
   
   - tests are/will be defined in PulsarMetricsSenderTest
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   - configuration points to define interval & tenant to send.
   
   ### Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? docs
     - If a feature is not applicable for documentation, explain why? WIP
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] KannarFr commented on a change in pull request #7433: [WIP] Enable pulsar component to send their metrics to topics

Posted by GitBox <gi...@apache.org>.
KannarFr commented on a change in pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433#discussion_r449034572



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/sender/PulsarMetricsSender.java
##########
@@ -0,0 +1,92 @@
+package org.apache.pulsar.broker.stats.sender;
+
+import com.google.common.collect.Sets;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.stats.Metrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+
+public class PulsarMetricsSender implements MetricsSender {
+    private static final Logger log = LoggerFactory.getLogger(PulsarMetricsSender.class);
+
+    private PulsarService pulsar;
+    private MetricsSenderConfiguration conf;
+    private ScheduledExecutorService metricsSenderExecutor;
+
+    private TopicName topicToSend;
+
+    private Producer<String> producer;
+
+    public PulsarMetricsSender(PulsarService pulsar, MetricsSenderConfiguration conf) {
+        this.pulsar = pulsar;
+        this.conf = conf;
+        this.metricsSenderExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-metrics-sender"));
+
+        this.topicToSend = TopicName.get(
+                "persistent", NamespaceName.get(this.conf.tenant, "brokers"), this.pulsar.getAdvertisedAddress());
+
+        this.prepareTopics();
+
+        try {
+            this.producer = this.pulsar.getClient().newProducer(Schema.STRING)
+                .topic(this.topicToSend.toString())
+                .enableBatching(true)
+                .producerName("metrics-sender-" + this.pulsar.getAdvertisedAddress())
+                .create();
+        } catch (PulsarClientException | PulsarServerException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void prepareTopics() {
+        TenantInfo tenantInfo = new TenantInfo(
+                pulsar.getConfig().getSuperUserRoles(),
+                Sets.newHashSet(pulsar.getConfig().getClusterName()));
+
+        try {
+            this.pulsar.getAdminClient().tenants().createTenant(topicToSend.getTenant(), tenantInfo);
+            this.pulsar.getAdminClient().namespaces().createNamespace(topicToSend.getNamespace());
+            this.pulsar.getAdminClient().topics().createNonPartitionedTopic(topicToSend.toString());

Review comment:
       @jiazhai @sijie what is the better way to manage tenant/ns/topic directly in pulsar code base? I'm currently using PulsarAdmin WDYT.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] KannarFr edited a comment on pull request #7433: [Issue #7419] [WIP] Enable pulsar component to send their metrics to topics

Posted by GitBox <gi...@apache.org>.
KannarFr edited a comment on pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433#issuecomment-687024882


   @sijie pull-based collection requires a more complex infra, but also for the monitored system to buffer data during polling. So when it's under load, you'll choose between eating memory or losing data. While in push, the data is out ASAP and the collector makes that decision. So push scales much better than poll particularly in the broker case where monitoring data increases with topics number.
   
   WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] KannarFr removed a comment on pull request #7433: [Issue #7419] [WIP] Enable pulsar component to send their metrics to topics

Posted by GitBox <gi...@apache.org>.
KannarFr removed a comment on pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433#issuecomment-653033714


   I created the PR but it's currently a POC to get your opinions. Should we use schema using Metrics POJO?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] KannarFr commented on a change in pull request #7433: [WIP] Enable pulsar component to send their metrics to topics

Posted by GitBox <gi...@apache.org>.
KannarFr commented on a change in pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433#discussion_r449035352



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/sender/PulsarMetricsSender.java
##########
@@ -0,0 +1,92 @@
+package org.apache.pulsar.broker.stats.sender;
+
+import com.google.common.collect.Sets;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.stats.Metrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+
+public class PulsarMetricsSender implements MetricsSender {
+    private static final Logger log = LoggerFactory.getLogger(PulsarMetricsSender.class);
+
+    private PulsarService pulsar;
+    private MetricsSenderConfiguration conf;
+    private ScheduledExecutorService metricsSenderExecutor;
+
+    private TopicName topicToSend;
+
+    private Producer<String> producer;
+
+    public PulsarMetricsSender(PulsarService pulsar, MetricsSenderConfiguration conf) {
+        this.pulsar = pulsar;
+        this.conf = conf;
+        this.metricsSenderExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-metrics-sender"));
+
+        this.topicToSend = TopicName.get(
+                "persistent", NamespaceName.get(this.conf.tenant, "brokers"), this.pulsar.getAdvertisedAddress());
+
+        this.prepareTopics();
+
+        try {
+            this.producer = this.pulsar.getClient().newProducer(Schema.STRING)
+                .topic(this.topicToSend.toString())
+                .enableBatching(true)
+                .producerName("metrics-sender-" + this.pulsar.getAdvertisedAddress())
+                .create();
+        } catch (PulsarClientException | PulsarServerException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void prepareTopics() {
+        TenantInfo tenantInfo = new TenantInfo(
+                pulsar.getConfig().getSuperUserRoles(),
+                Sets.newHashSet(pulsar.getConfig().getClusterName()));
+
+        try {
+            this.pulsar.getAdminClient().tenants().createTenant(topicToSend.getTenant(), tenantInfo);
+            this.pulsar.getAdminClient().namespaces().createNamespace(topicToSend.getNamespace());
+            this.pulsar.getAdminClient().topics().createNonPartitionedTopic(topicToSend.toString());
+        } catch (PulsarAdminException | PulsarServerException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void start() {
+        final int interval = this.conf.intervalInSeconds;
+        log.info("Scheduling a thread to send metrics after [{}] seconds in background", interval);
+        this.metricsSenderExecutor.scheduleAtFixedRate(safeRun(this::getAndSendMetrics), interval, interval, TimeUnit.SECONDS);
+        getAndSendMetrics();
+    }
+
+    @Override
+    public void getAndSendMetrics() {
+        List<Metrics> metricsToSend = this.pulsar.getBrokerService().getTopicMetrics();
+
+        metricsToSend.forEach(metrics -> {
+            try {
+                log.info("Sending metrics [{}]", metrics.toString());
+                this.producer.send(metrics.toString());
+            } catch (PulsarClientException e) {
+                e.printStackTrace();
+            }
+        });
+    }
+}

Review comment:
       Should we embed a retry with backoff stuff?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] KannarFr closed pull request #7433: [Issue #7419] [WIP] Enable pulsar component to send their metrics to topics

Posted by GitBox <gi...@apache.org>.
KannarFr closed pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] KannarFr commented on pull request #7433: [Issue #7419] [WIP] Enable pulsar component to send their metrics to topics

Posted by GitBox <gi...@apache.org>.
KannarFr commented on pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433#issuecomment-687024882


   @sijie pull-based collection require a more complex infra, but also for the monitored system to buffer data during polling. So when it's under load, you'll choose between eating memory or losing data. While in push, the data is out ASAP and the collector makes that decision. So push scales much better than poll particularly in the broker case where monitoring data increases with topics number.
   
   WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] KannarFr commented on pull request #7433: [WIP] Enable pulsar component to send their metrics to topics

Posted by GitBox <gi...@apache.org>.
KannarFr commented on pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433#issuecomment-653033714


   I created the PR but it's currently a POC to get your opinions. Should we use schema using Metrics POJO?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] KannarFr closed pull request #7433: [Issue #7419] [WIP] Enable pulsar component to send their metrics to topics

Posted by GitBox <gi...@apache.org>.
KannarFr closed pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #7433: [Issue #7419] [WIP] Enable pulsar component to send their metrics to topics

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #7433:
URL: https://github.com/apache/pulsar/pull/7433#issuecomment-759653339


   @KannarFr You can install a local agent per bookie to scrap the metrics from the local endpoint and push it to your metrics system. How is that different from adding a sender in Pulsar?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org