You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/03 18:15:35 UTC

[GitHub] merlimat closed pull request #2686: [functions][metrics] Update function metrics

merlimat closed pull request #2686: [functions][metrics] Update function metrics
URL: https://github.com/apache/pulsar/pull/2686
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 36343cfe30..8a727f2d39 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -24,12 +24,9 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.eclipse.jetty.util.ConcurrentHashSet;
 
 import io.netty.util.concurrent.FastThreadLocal;
 
-import java.util.Set;
-
 public class NamespaceStatsAggregator {
 
     private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>() {
@@ -46,8 +43,6 @@ protected TopicStats initialValue() throws Exception {
         }
     };
 
-    private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>();
-
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
@@ -56,8 +51,6 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
 
-            METRIC_TYPES.forEach(metric -> TopicStats.metricType(stream, metric));
-
             bundlesMap.forEach((bundle, topicsMap) -> {
                 topicsMap.forEach((name, topic) -> {
                     getTopicStats(topic, topicStats, includeConsumerMetrics);
@@ -225,33 +218,21 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
                                long value) {
-        if (!METRIC_TYPES.contains(name)) {
-            TopicStats.metricType(stream, name);
-            METRIC_TYPES.add(name);
-        }
-
+        TopicStats.metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
                                double value) {
-        if (!METRIC_TYPES.contains(name)) {
-            TopicStats.metricType(stream, name);
-            METRIC_TYPES.add(name);
-        }
-
+        TopicStats.metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
                                                 String name, String remoteCluster, double value) {
-        if (!METRIC_TYPES.contains(name)) {
-            TopicStats.metricType(stream, name);
-            METRIC_TYPES.add(name);
-        }
-
+        TopicStats.metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
         stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index ff946b95fd..5968b8cbe3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -20,12 +20,10 @@
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.eclipse.jetty.util.ConcurrentHashSet;
 
 class TopicStats {
 
@@ -47,7 +45,6 @@
 
     Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
     Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
-    private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>();
 
     public void reset() {
         subscriptionsCount = 0;
@@ -71,9 +68,6 @@ public void reset() {
 
     static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
                                 TopicStats stats) {
-
-        METRIC_TYPES.forEach(metric -> metricType(stream, metric));
-
         metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
         metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
         metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
@@ -140,11 +134,7 @@ static void metricType(SimpleTextOutputStream stream, String name) {
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
                                String name, double value) {
-        if (!METRIC_TYPES.contains(name)) {
-            metricType(stream, name);
-            METRIC_TYPES.add(name);
-        }
-
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
@@ -152,11 +142,7 @@ private static void metric(SimpleTextOutputStream stream, String cluster, String
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String name, long value) {
-        if (!METRIC_TYPES.contains(name)) {
-            metricType(stream, name);
-            METRIC_TYPES.add(name);
-        }
-
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
@@ -164,11 +150,7 @@ private static void metric(SimpleTextOutputStream stream, String cluster, String
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String name, double value) {
-        if (!METRIC_TYPES.contains(name)) {
-            metricType(stream, name);
-            METRIC_TYPES.add(name);
-        }
-
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
@@ -176,11 +158,7 @@ private static void metric(SimpleTextOutputStream stream, String cluster, String
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String consumerName, long consumerId, String name, long value) {
-        if (!METRIC_TYPES.contains(name)) {
-            metricType(stream, name);
-            METRIC_TYPES.add(name);
-        }
-
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
                 .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
@@ -189,11 +167,7 @@ private static void metric(SimpleTextOutputStream stream, String cluster, String
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String consumerName, long consumerId, String name, double value) {
-        if (!METRIC_TYPES.contains(name)) {
-            metricType(stream, name);
-            METRIC_TYPES.add(name);
-        }
-
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
                 .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 907afaf198..be7c88b9b1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -37,12 +37,8 @@
 
     private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class);
 
-    private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>();
-
     public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
         if (workerService != null) {
-            METRIC_TYPES.forEach(metric -> metricType(out, metric));
-
             Map<String, FunctionRuntimeInfo> functionRuntimes
                     = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
 
@@ -98,11 +94,7 @@ private static void metricType(SimpleTextOutputStream stream, String name) {
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace,
                                String functionName, String metricName, int instanceId, double value) {
-        if (!METRIC_TYPES.contains(metricName)) {
-            metricType(stream, metricName);
-            METRIC_TYPES.add(metricName);
-        }
-
+        metricType(stream, metricName);
         stream.write(metricName).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
                 .write("\",name=\"").write(functionName).write("\",instanceId=\"").write(instanceId).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
index 0b8706041a..b94dfd8f51 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
@@ -53,7 +53,10 @@ public Response getMetrics() throws JsonProcessingException {
                 out.write(payload, arrayOffset, readableBytes);
                 out.flush();
             };
-            return Response.ok(streamOut).build();
+            return Response
+                .ok(streamOut)
+                .type(MediaType.TEXT_PLAIN_TYPE)
+                .build();
         } finally {
             buf.release();
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services