You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/07 20:03:44 UTC

[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #11555: [Offload] Add offload performance metrics for tiered storage.

michaeljmarshall commented on a change in pull request #11555:
URL: https://github.com/apache/pulsar/pull/11555#discussion_r684665688



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderMXBeanImpl.java
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloaderMXBean;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.stats.Rate;
+
+public class LedgerOffloaderMXBeanImpl implements LedgerOffloaderMXBean {
+
+    public static final long[] READ_ENTRY_LATENCY_BUCKETS_USEC = {500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
+            200_000, 1000_000};
+
+    private final String name;
+
+    private final ScheduledExecutorService scheduler;
+    private long refreshIntervalSeconds;
+
+    // offloadTimeMap record the time cost by one round offload
+    private final ConcurrentHashMap<String, Rate> offloadTimeMap = new ConcurrentHashMap<>();
+    // offloadErrorMap record error ocurred
+    private final ConcurrentHashMap<String, Rate> offloadErrorMap = new ConcurrentHashMap<>();
+    // offloadRateMap record the offload rate
+    private final ConcurrentHashMap<String, Rate> offloadRateMap = new ConcurrentHashMap<>();
+
+
+    // readLedgerLatencyBucketsMap record the time cost by ledger read
+    private final ConcurrentHashMap<String, StatsBuckets> readLedgerLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageLatencyBucketsMap record the time cost by write to storage
+    private final ConcurrentHashMap<String, StatsBuckets> writeToStorageLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageErrorMap record the error occurred in write storage
+    private final ConcurrentHashMap<String, Rate> writeToStorageErrorMap = new ConcurrentHashMap<>();
+
+
+    // streamingWriteToStorageRateMap and streamingWriteToStorageErrorMap is for streamingOffload
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageErrorMap = new ConcurrentHashMap<>();
+
+    // readOffloadIndexLatencyBucketsMap and readOffloadDataLatencyBucketsMap are latency metrics about index and data
+    // readOffloadDataRateMap and readOffloadErrorMap is for reading offloaded data
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadIndexLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadDataLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadDataRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadErrorMap = new ConcurrentHashMap<>();
+
+    public LedgerOffloaderMXBeanImpl(String name, long refreshIntervalSecond) {
+        this.name = name;
+        this.refreshIntervalSeconds = refreshIntervalSeconds;

Review comment:
       ```suggestion
       public LedgerOffloaderMXBeanImpl(String name, long refreshIntervalSeconds) {
           this.name = name;
           this.refreshIntervalSeconds = refreshIntervalSeconds;
   ```
   The argument is not currently used, as there is a missing `s`.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderMXBeanImpl.java
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloaderMXBean;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.stats.Rate;
+
+public class LedgerOffloaderMXBeanImpl implements LedgerOffloaderMXBean {
+
+    public static final long[] READ_ENTRY_LATENCY_BUCKETS_USEC = {500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
+            200_000, 1000_000};
+
+    private final String name;
+
+    private final ScheduledExecutorService scheduler;
+    private long refreshIntervalSeconds;
+
+    // offloadTimeMap record the time cost by one round offload
+    private final ConcurrentHashMap<String, Rate> offloadTimeMap = new ConcurrentHashMap<>();
+    // offloadErrorMap record error ocurred
+    private final ConcurrentHashMap<String, Rate> offloadErrorMap = new ConcurrentHashMap<>();
+    // offloadRateMap record the offload rate
+    private final ConcurrentHashMap<String, Rate> offloadRateMap = new ConcurrentHashMap<>();
+
+
+    // readLedgerLatencyBucketsMap record the time cost by ledger read
+    private final ConcurrentHashMap<String, StatsBuckets> readLedgerLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageLatencyBucketsMap record the time cost by write to storage
+    private final ConcurrentHashMap<String, StatsBuckets> writeToStorageLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageErrorMap record the error occurred in write storage
+    private final ConcurrentHashMap<String, Rate> writeToStorageErrorMap = new ConcurrentHashMap<>();
+
+
+    // streamingWriteToStorageRateMap and streamingWriteToStorageErrorMap is for streamingOffload
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageErrorMap = new ConcurrentHashMap<>();
+
+    // readOffloadIndexLatencyBucketsMap and readOffloadDataLatencyBucketsMap are latency metrics about index and data
+    // readOffloadDataRateMap and readOffloadErrorMap is for reading offloaded data
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadIndexLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadDataLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadDataRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadErrorMap = new ConcurrentHashMap<>();
+
+    public LedgerOffloaderMXBeanImpl(String name, long refreshIntervalSecond) {
+        this.name = name;
+        this.refreshIntervalSeconds = refreshIntervalSeconds;
+        this.scheduler = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("ledger-offloader-metrics"));
+        this.scheduler.scheduleAtFixedRate(
+                safeRun(() -> refreshStats()), refreshIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS);
+    }
+
+    public void refreshStats() {
+        double seconds = refreshIntervalSeconds;
+
+        if (seconds <= 0.0) {
+            // skip refreshing stats
+            return;
+        }
+        offloadTimeMap.values().forEach(rate->rate.calculateRate(seconds));
+        offloadErrorMap.values().forEach(rate->rate.calculateRate(seconds));
+        offloadRateMap.values().forEach(rate-> rate.calculateRate(seconds));
+        readLedgerLatencyBucketsMap.values().forEach(stat-> stat.refresh());
+        writeToStorageLatencyBucketsMap.values().forEach(stat -> stat.refresh());
+        writeToStorageErrorMap.values().forEach(rate->rate.calculateRate(seconds));
+        streamingWriteToStorageRateMap.values().forEach(rate -> rate.calculateRate(seconds));
+        streamingWriteToStorageErrorMap.values().forEach(rate->rate.calculateRate(seconds));
+        readOffloadDataRateMap.values().forEach(rate->rate.calculateRate(seconds));
+        readOffloadErrorMap.values().forEach(rate->rate.calculateRate(seconds));
+        readOffloadIndexLatencyBucketsMap.values().forEach(stat->stat.refresh());
+        readOffloadDataLatencyBucketsMap.values().forEach(stat->stat.refresh());
+    }
+
+    //TODO metrics在namespace这个level的输出。
+
+    @Override
+    public String getName() {
+        return this.name;
+    }
+
+    @Override
+    public Map<String, Rate> getOffloadTimes() {
+        return offloadTimeMap;
+    }
+
+    @Override
+    public Map<String, Rate> getOffloadErrors() {
+        return offloadErrorMap;
+    }
+
+    @Override
+    public Map<String, Rate> getOffloadRates() {
+        return offloadRateMap;
+    }
+
+    @Override
+    public Map<String, StatsBuckets> getReadLedgerLatencyBuckets() {
+        return readLedgerLatencyBucketsMap;
+    }
+
+    @Override
+    public Map<String, StatsBuckets> getWriteToStorageLatencyBuckets() {
+        return writeToStorageLatencyBucketsMap;
+    }
+
+    @Override
+    public Map<String, Rate> getWriteToStorageErrors() {
+        return writeToStorageErrorMap;
+    }
+
+    @Override
+    public Map<String, Rate> getStreamingWriteToStorageRates() {
+        return streamingWriteToStorageRateMap;
+    }
+
+    @Override
+    public Map<String, Rate> getStreamingWriteToStorageErrors() {
+        return streamingWriteToStorageErrorMap;
+    }
+
+
+    @Override
+    public Map<String, StatsBuckets> getReadOffloadIndexLatencyBuckets() {
+        return readOffloadIndexLatencyBucketsMap;
+    }
+
+    @Override
+    public Map<String, StatsBuckets> getReadOffloadDataLatencyBuckets() {
+        return readOffloadDataLatencyBucketsMap;
+    }
+
+    @Override
+    public Map<String, Rate> getReadOffloadRates() {
+        return readOffloadDataRateMap;
+    }
+
+    @Override
+    public Map<String, Rate> getReadOffloadErrors() {
+        return readOffloadErrorMap;
+    }
+
+    public void recordOffloadTime(String managedLedgerName, long time, TimeUnit unit) {
+        Rate adder = offloadTimeMap.get(managedLedgerName);
+        if (adder == null) {
+            adder = offloadTimeMap.compute(managedLedgerName, (k, v) -> new Rate());
+        }
+        adder.recordEvent(unit.toMillis(time));

Review comment:
       As currently written, all of these `record` methods are open to small race conditions the first time that metrics are recorded for a given `managedLedgerName`. It is safer to use the `computeIfAbsent` method on the concurrent hash maps:
   
   ```suggestion
           Rate adder = offloadTimeMap.computeIfAbsent(managedLedgerName, k -> new Rate());
           adder.recordEvent(unit.toMillis(time));
   ```

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
##########
@@ -113,9 +123,12 @@ private int readEntries() throws IOException {
 
     private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntries) throws IOException {
         long end = Math.min(start + maxNumberEntries - 1, ledger.getLastAddConfirmed());
+        long startTime = System.nanoTime();
         try (LedgerEntries ledgerEntriesOnce = ledger.readAsync(start, end).get()) {
-            log.debug("read ledger entries. start: {}, end: {}", start, end);
-
+            log.debug("read ledger entries. start: {}, end: {} cost {}", start, end, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime));

Review comment:
       Since this debug log line now contains a calculation for `System.nanoTime()`, I think we should wrap it in a conditional checking `log.isDebugEnabled()`.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderMXBeanImpl.java
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloaderMXBean;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.stats.Rate;
+
+public class LedgerOffloaderMXBeanImpl implements LedgerOffloaderMXBean {

Review comment:
       Do we have a convention for handling topic metrics when topics are no longer hosted by the local broker? When a managedLedger is added to the maps in this class, they are never removed. However, if a topic is unloaded, it would lead to these metrics for the topic being reported by both brokers. Perhaps this is okay though?

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderMXBeanImpl.java
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloaderMXBean;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.stats.Rate;
+
+public class LedgerOffloaderMXBeanImpl implements LedgerOffloaderMXBean {
+
+    public static final long[] READ_ENTRY_LATENCY_BUCKETS_USEC = {500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
+            200_000, 1000_000};
+
+    private final String name;
+
+    private final ScheduledExecutorService scheduler;
+    private long refreshIntervalSeconds;
+
+    // offloadTimeMap record the time cost by one round offload
+    private final ConcurrentHashMap<String, Rate> offloadTimeMap = new ConcurrentHashMap<>();
+    // offloadErrorMap record error ocurred
+    private final ConcurrentHashMap<String, Rate> offloadErrorMap = new ConcurrentHashMap<>();
+    // offloadRateMap record the offload rate
+    private final ConcurrentHashMap<String, Rate> offloadRateMap = new ConcurrentHashMap<>();
+
+
+    // readLedgerLatencyBucketsMap record the time cost by ledger read
+    private final ConcurrentHashMap<String, StatsBuckets> readLedgerLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageLatencyBucketsMap record the time cost by write to storage
+    private final ConcurrentHashMap<String, StatsBuckets> writeToStorageLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageErrorMap record the error occurred in write storage
+    private final ConcurrentHashMap<String, Rate> writeToStorageErrorMap = new ConcurrentHashMap<>();
+
+
+    // streamingWriteToStorageRateMap and streamingWriteToStorageErrorMap is for streamingOffload
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageErrorMap = new ConcurrentHashMap<>();
+
+    // readOffloadIndexLatencyBucketsMap and readOffloadDataLatencyBucketsMap are latency metrics about index and data
+    // readOffloadDataRateMap and readOffloadErrorMap is for reading offloaded data
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadIndexLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadDataLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadDataRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadErrorMap = new ConcurrentHashMap<>();
+
+    public LedgerOffloaderMXBeanImpl(String name, long refreshIntervalSecond) {
+        this.name = name;
+        this.refreshIntervalSeconds = refreshIntervalSeconds;
+        this.scheduler = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("ledger-offloader-metrics"));
+        this.scheduler.scheduleAtFixedRate(
+                safeRun(() -> refreshStats()), refreshIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS);
+    }
+
+    public void refreshStats() {
+        double seconds = refreshIntervalSeconds;
+
+        if (seconds <= 0.0) {
+            // skip refreshing stats
+            return;
+        }
+        offloadTimeMap.values().forEach(rate->rate.calculateRate(seconds));
+        offloadErrorMap.values().forEach(rate->rate.calculateRate(seconds));
+        offloadRateMap.values().forEach(rate-> rate.calculateRate(seconds));
+        readLedgerLatencyBucketsMap.values().forEach(stat-> stat.refresh());
+        writeToStorageLatencyBucketsMap.values().forEach(stat -> stat.refresh());
+        writeToStorageErrorMap.values().forEach(rate->rate.calculateRate(seconds));
+        streamingWriteToStorageRateMap.values().forEach(rate -> rate.calculateRate(seconds));
+        streamingWriteToStorageErrorMap.values().forEach(rate->rate.calculateRate(seconds));
+        readOffloadDataRateMap.values().forEach(rate->rate.calculateRate(seconds));
+        readOffloadErrorMap.values().forEach(rate->rate.calculateRate(seconds));
+        readOffloadIndexLatencyBucketsMap.values().forEach(stat->stat.refresh());
+        readOffloadDataLatencyBucketsMap.values().forEach(stat->stat.refresh());
+    }
+
+    //TODO metrics在namespace这个level的输出。

Review comment:
       It'd be great to provide an option for metrics rolled up at the namespace level.

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
##########
@@ -40,6 +42,8 @@
     private final ByteBuf buffer;
     private final long objectLen;
     private final int bufferSize;
+    private  LedgerOffloaderMXBeanImpl mbean;

Review comment:
       ```suggestion
       private LedgerOffloaderMXBeanImpl mbean;
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
##########
@@ -125,6 +135,24 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
         }
     }
 
+    private static void generateLedgerOffloaderMetrics(PulsarService pulsar, SimpleTextOutputStream stream) {
+        pulsar.getBrokerService().getTopics().keys().stream()
+                .filter(topic -> topic.startsWith(TopicDomain.persistent.value())).forEach(topic -> {
+            try {
+                ManagedLedgerConfig managedLedgerConfig = pulsar.getBrokerService()
+                        .getManagedLedgerConfig(TopicName.get(topic)).get();
+                LedgerOffloader ledgerOffloader = managedLedgerConfig.getLedgerOffloader();
+                if (ledgerOffloader != NullLedgerOffloader.INSTANCE && ledgerOffloader.getStats() != null) {
+                    String clusterName = pulsar.getConfiguration().getClusterName();
+                    List<Metrics> metrics = new LedgerOffloaderMetrics(pulsar, ledgerOffloader).generate();
+                    parseMetricsToPrometheusMetrics(metrics, clusterName, Collector.Type.GAUGE, stream);
+                }
+            } catch (Exception ex) {
+                log.error("generate ledger offloader metrics error", ex);
+            }
+        });
+    }

Review comment:
       I am concerned that this code block will introduce a performance bottleneck for pulsar clusters with many topics. The logic includes a string comparison on all topics, then a config look up for each topic to determine if it is offload. There is a fair amount of computation even if offloading is not enabled for any topics in the pulsar cluster.
   
   Additionally, is there a more efficient way to calculate if a topic is persistent? String comparison seems suboptimal.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
##########
@@ -1130,9 +1131,43 @@ public void testMetricsProvider() throws IOException {
         BufferedReader reader = new BufferedReader(isReader);
         StringBuffer sb = new StringBuffer();
         String str;
-        while((str = reader.readLine()) != null){
+        while ((str = reader.readLine()) != null) {
             sb.append(str);
         }
+        System.out.println(sb.toString());
+        Assert.assertTrue(sb.toString().contains("test_metrics"));
+    }
+
+    @Test
+    public void testTieredStorageMetrics() throws IOException, PulsarAdminException {
+        final String tenant = "public";
+        final String namespace = "public/default";
+        final String topic = "persistent://" + namespace + "/tiered-storage";
+        Set<String> allowedClusters = new HashSet<>();
+        allowedClusters.add("test");
+        admin.tenants().createTenant(tenant, TenantInfo.builder().allowedClusters(allowedClusters).build());
+        admin.namespaces().createNamespace(namespace, 4);
+        try {
+            Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+            producer.close();
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+        //

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org