You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/07 15:16:07 UTC

[pulsar] branch master updated: [fix][broker][monitoring] provide namespace level metrics for SchemaRegistry. (#15870)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cca2fe687d [fix][broker][monitoring] provide namespace level metrics for SchemaRegistry. (#15870)
9cca2fe687d is described below

commit 9cca2fe687df93a9c37f95b5f3649496d9633f2b
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Tue Jun 7 23:15:59 2022 +0800

    [fix][broker][monitoring] provide namespace level metrics for SchemaRegistry. (#15870)
    
    * present namespace level metrics for schema registry.
    
    * present namespace level metrics for schema registry.
    
    * fix SchemaRegistryStats
---
 .../org/apache/pulsar/broker/PulsarService.java    |  2 +-
 .../service/schema/SchemaRegistryService.java      |  6 +-
 .../service/schema/SchemaRegistryServiceImpl.java  | 11 ++-
 .../broker/service/schema/SchemaRegistryStats.java | 95 +++++++++++++++++-----
 .../broker/service/schema/SchemaServiceTest.java   | 19 +++--
 5 files changed, 99 insertions(+), 34 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 22368aab521..920cbf8d689 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -727,7 +727,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
             schemaStorage = createAndStartSchemaStorage();
             schemaRegistryService = SchemaRegistryService.create(
-                    schemaStorage, config.getSchemaRegistryCompatibilityCheckers());
+                    schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this.executor);
 
             this.defaultOffloader = createManagedLedgerOffloader(
                     OffloadPoliciesImpl.create(this.getConfiguration().getProperties()));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 7acf3a65389..4668ba62850 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.schema;
 import com.google.common.collect.Maps;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -42,13 +43,14 @@ public interface SchemaRegistryService extends SchemaRegistry {
         return checkers;
     }
 
-    static SchemaRegistryService create(SchemaStorage schemaStorage, Set<String> schemaRegistryCompatibilityCheckers) {
+    static SchemaRegistryService create(SchemaStorage schemaStorage, Set<String> schemaRegistryCompatibilityCheckers,
+                                        ScheduledExecutorService scheduler) {
         if (schemaStorage != null) {
             try {
                 Map<SchemaType, SchemaCompatibilityCheck> checkers = getCheckers(schemaRegistryCompatibilityCheckers);
                 checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
                 return SchemaRegistryServiceWithSchemaDataValidator.of(
-                        new SchemaRegistryServiceImpl(schemaStorage, checkers));
+                        new SchemaRegistryServiceImpl(schemaStorage, checkers, scheduler));
             } catch (Exception e) {
                 LOG.warn("Unable to create schema registry storage, defaulting to empty storage", e);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 1471e0fc07c..1ca0a46c855 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 import lombok.extern.slf4j.Slf4j;
@@ -67,17 +68,19 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
 
     @VisibleForTesting
     SchemaRegistryServiceImpl(SchemaStorage schemaStorage,
-                              Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks, Clock clock) {
+                              Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks, Clock clock,
+                              ScheduledExecutorService scheduler) {
         this.schemaStorage = schemaStorage;
         this.compatibilityChecks = compatibilityChecks;
         this.clock = clock;
-        this.stats = SchemaRegistryStats.getInstance();
+        this.stats = SchemaRegistryStats.getInstance(scheduler);
     }
 
     @VisibleForTesting
     SchemaRegistryServiceImpl(SchemaStorage schemaStorage,
-                              Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks) {
-        this(schemaStorage, compatibilityChecks, Clock.systemUTC());
+                              Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks,
+                              ScheduledExecutorService scheduler) {
+        this(schemaStorage, compatibilityChecks, Clock.systemUTC(), scheduler);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
index 4f79c737fe7..042fa86d7ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
@@ -21,10 +21,16 @@ package org.apache.pulsar.broker.service.schema;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Summary;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.common.naming.TopicName;
 
-class SchemaRegistryStats implements AutoCloseable {
-    private static final String SCHEMA_ID = "schema";
+class SchemaRegistryStats implements AutoCloseable, Runnable {
+    private static final String NAMESPACE = "namespace";
     private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 0.9999, 1};
     private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
 
@@ -39,36 +45,43 @@ class SchemaRegistryStats implements AutoCloseable {
     private final Summary getOpsLatency;
     private final Summary putOpsLatency;
 
+    private final Map<String, Long> namespaceAccess = new ConcurrentHashMap<>();
+    private ScheduledFuture<?> future;
+
     private static volatile SchemaRegistryStats instance;
 
-    static synchronized SchemaRegistryStats getInstance() {
+    static synchronized SchemaRegistryStats getInstance(ScheduledExecutorService scheduler) {
         if (null == instance) {
-            instance = new SchemaRegistryStats();
+            instance = new SchemaRegistryStats(scheduler);
         }
 
         return instance;
     }
 
-    private SchemaRegistryStats() {
+    private SchemaRegistryStats(ScheduledExecutorService scheduler) {
         this.deleteOpsFailedCounter = Counter.build("pulsar_schema_del_ops_failed_count", "-")
-                .labelNames(SCHEMA_ID).create().register();
+                .labelNames(NAMESPACE).create().register();
         this.getOpsFailedCounter = Counter.build("pulsar_schema_get_ops_failed_count", "-")
-                .labelNames(SCHEMA_ID).create().register();
+                .labelNames(NAMESPACE).create().register();
         this.putOpsFailedCounter = Counter.build("pulsar_schema_put_ops_failed_count", "-")
-                .labelNames(SCHEMA_ID).create().register();
+                .labelNames(NAMESPACE).create().register();
 
         this.compatibleCounter = Counter.build("pulsar_schema_compatible_count", "-")
-                .labelNames(SCHEMA_ID).create().register();
+                .labelNames(NAMESPACE).create().register();
         this.incompatibleCounter = Counter.build("pulsar_schema_incompatible_count", "-")
-                .labelNames(SCHEMA_ID).create().register();
+                .labelNames(NAMESPACE).create().register();
 
         this.deleteOpsLatency = this.buildSummary("pulsar_schema_del_ops_latency", "-");
         this.getOpsLatency = this.buildSummary("pulsar_schema_get_ops_latency", "-");
         this.putOpsLatency = this.buildSummary("pulsar_schema_put_ops_latency", "-");
+
+        if (null != scheduler) {
+            this.future = scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.MINUTES);
+        }
     }
 
     private Summary buildSummary(String name, String help) {
-        Summary.Builder builder = Summary.build(name, help).labelNames(SCHEMA_ID);
+        Summary.Builder builder = Summary.build(name, help).labelNames(NAMESPACE);
 
         for (double quantile : QUANTILES) {
             builder.quantile(quantile, 0.01D);
@@ -78,35 +91,60 @@ class SchemaRegistryStats implements AutoCloseable {
     }
 
     void recordDelFailed(String schemaId) {
-        this.deleteOpsFailedCounter.labels(schemaId).inc();
+        this.deleteOpsFailedCounter.labels(getNamespace(schemaId)).inc();
     }
 
     void recordGetFailed(String schemaId) {
-        this.getOpsFailedCounter.labels(schemaId).inc();
+        this.getOpsFailedCounter.labels(getNamespace(schemaId)).inc();
     }
 
     void recordPutFailed(String schemaId) {
-        this.putOpsFailedCounter.labels(schemaId).inc();
+        this.putOpsFailedCounter.labels(getNamespace(schemaId)).inc();
     }
 
     void recordDelLatency(String schemaId, long millis) {
-        this.deleteOpsLatency.labels(schemaId).observe(millis);
+        this.deleteOpsLatency.labels(getNamespace(schemaId)).observe(millis);
     }
 
     void recordGetLatency(String schemaId, long millis) {
-        this.getOpsLatency.labels(schemaId).observe(millis);
+        this.getOpsLatency.labels(getNamespace(schemaId)).observe(millis);
     }
 
     void recordPutLatency(String schemaId, long millis) {
-        this.putOpsLatency.labels(schemaId).observe(millis);
+        this.putOpsLatency.labels(getNamespace(schemaId)).observe(millis);
     }
 
     void recordSchemaIncompatible(String schemaId) {
-        this.incompatibleCounter.labels(schemaId).inc();
+        this.incompatibleCounter.labels(getNamespace(schemaId)).inc();
     }
 
     void recordSchemaCompatible(String schemaId) {
-        this.compatibleCounter.labels(schemaId).inc();
+        this.compatibleCounter.labels(getNamespace(schemaId)).inc();
+    }
+
+
+    private String getNamespace(String schemaId) {
+        String namespace;
+        try {
+            namespace = TopicName.get(schemaId).getNamespace();
+        } catch (IllegalArgumentException t) {
+            namespace = "unknown";
+        }
+
+        this.namespaceAccess.put(namespace, System.currentTimeMillis());
+        return namespace;
+    }
+
+
+    private void removeChild(String namespace) {
+        getOpsFailedCounter.remove(namespace);
+        putOpsFailedCounter.remove(namespace);
+        deleteOpsFailedCounter.remove(namespace);
+        compatibleCounter.remove(namespace);
+        incompatibleCounter.remove(namespace);
+        deleteOpsLatency.remove(namespace);
+        getOpsLatency.remove(namespace);
+        putOpsLatency.remove(namespace);
     }
 
     @Override
@@ -120,6 +158,25 @@ class SchemaRegistryStats implements AutoCloseable {
             CollectorRegistry.defaultRegistry.unregister(this.deleteOpsLatency);
             CollectorRegistry.defaultRegistry.unregister(this.getOpsLatency);
             CollectorRegistry.defaultRegistry.unregister(this.putOpsLatency);
+            if (null != this.future) {
+                this.future.cancel(false);
+            }
         }
     }
+
+    @Override
+    public void run() {
+        long now = System.currentTimeMillis();
+        long interval = TimeUnit.MINUTES.toMillis(5);
+
+        this.namespaceAccess.entrySet().removeIf(entry -> {
+            String namespace = entry.getKey();
+            long accessTime = entry.getValue();
+            if (now - accessTime > interval) {
+                this.removeChild(namespace);
+                return true;
+            }
+            return false;
+        });
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 8a815409bd8..6c5eddc731a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
@@ -91,7 +92,7 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
         storage.start();
         Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
         checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
-        schemaRegistryService = new SchemaRegistryServiceImpl(storage, checkMap, MockClock);
+        schemaRegistryService = new SchemaRegistryServiceImpl(storage, checkMap, MockClock, null);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -103,14 +104,16 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testSchemaRegistryMetrics() throws Exception {
-        putSchema(schemaId1, schemaData1, version(0));
-        getSchema(schemaId1, version(0));
-        deleteSchema(schemaId1, version(1));
+        String schemaId = "tenant/ns/topic" + UUID.randomUUID();
+        String namespace = TopicName.get(schemaId).getNamespace();
+        putSchema(schemaId, schemaData1, version(0));
+        getSchema(schemaId, version(0));
+        deleteSchema(schemaId, version(1));
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, false, false, false, output);
         output.flush();
-        String metricsStr = new String(output.toByteArray(), StandardCharsets.UTF_8);
+        String metricsStr = output.toString(StandardCharsets.UTF_8);
         Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
 
         Collection<PrometheusMetricsTest.Metric> delMetrics = metrics.get("pulsar_schema_del_ops_failed_count");
@@ -122,19 +125,19 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
 
         Collection<PrometheusMetricsTest.Metric> deleteLatency = metrics.get("pulsar_schema_del_ops_latency_count");
         for (PrometheusMetricsTest.Metric metric : deleteLatency) {
-            Assert.assertEquals(metric.tags.get("schema"), schemaId1);
+            Assert.assertEquals(metric.tags.get("namespace"), namespace);
             Assert.assertTrue(metric.value > 0);
         }
 
         Collection<PrometheusMetricsTest.Metric> getLatency = metrics.get("pulsar_schema_get_ops_latency_count");
         for (PrometheusMetricsTest.Metric metric : getLatency) {
-            Assert.assertEquals(metric.tags.get("schema"), schemaId1);
+            Assert.assertEquals(metric.tags.get("namespace"), namespace);
             Assert.assertTrue(metric.value > 0);
         }
 
         Collection<PrometheusMetricsTest.Metric> putLatency = metrics.get("pulsar_schema_put_ops_latency_count");
         for (PrometheusMetricsTest.Metric metric : putLatency) {
-            Assert.assertEquals(metric.tags.get("schema"), schemaId1);
+            Assert.assertEquals(metric.tags.get("namespace"), namespace);
             Assert.assertTrue(metric.value > 0);
         }
     }