You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/31 09:30:44 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #13833: Offloader metrics

eolivelli commented on a change in pull request #13833:
URL: https://github.com/apache/pulsar/pull/13833#discussion_r839377115



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java
##########
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.naming.TopicName;
+
+
+/**
+ * Management Bean for a {@link LedgerOffloader}.
+ */
+@InterfaceAudience.LimitedPrivate
+@InterfaceStability.Stable
+public final class LedgerOffloaderStats implements Runnable {
+    private static final String TOPIC_LABEL = "topic";
+    private static final String NAMESPACE_LABEL = "namespace";
+    private static final String UNKNOWN = "unknown";
+
+    private final boolean exposeLedgerMetrics;
+    private final boolean exposeTopicLevelMetrics;
+    private final int interval;
+
+    private Counter offloadError;
+    private Gauge offloadRate;
+    private Summary readLedgerLatency;
+    private Counter writeStorageError;
+    private Counter readOffloadError;
+    private Gauge readOffloadRate;
+    private Summary readOffloadIndexLatency;
+    private Summary readOffloadDataLatency;
+
+    private Map<String, String> topic2Namespace;
+    private Map<String, Pair<LongAdder, LongAdder>> offloadAndReadOffloadBytesMap;
+
+    private static volatile LedgerOffloaderStats instance;

Review comment:
       please do not use singletons.
   I know it is harder to glue the things but one we add this Singleton we won't be able to easily drop it in the future

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -1244,10 +1245,15 @@ public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, Of
     public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
             throws PulsarServerException {
         try {
+            //initialize ledger offloader stats
+            LedgerOffloaderStats.initialize(config.isExposeManagedLedgerMetricsInPrometheus(),
+                    config.isExposeTopicLevelMetricsInPrometheus(), this.executor,
+                    config.getManagedLedgerStatsPeriodSeconds());
             if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
+
                 Offloaders offloaders = offloadersCache.getOrLoadOffloaders(

Review comment:
       we can create a single instance of LedgerOffloaderStats and add a field in PulsarService
   
   then you can pass the LedgerOffloaderStats to Offloaders

##########
File path: tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
##########
@@ -105,6 +108,7 @@ private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedSchedu
         this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(conf.getManagedLedgerOffloadMaxThreads())
                 .name("offload-assignment").build();
+        this.offloaderStats = LedgerOffloaderStats.getInstance();

Review comment:
       please pass this into the constructor 

##########
File path: tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
##########
@@ -125,6 +129,7 @@ public FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf,
         this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(conf.getManagedLedgerOffloadMaxThreads())
                 .name("offload-assignment").build();
+        this.offloaderStats = LedgerOffloaderStats.getInstance();

Review comment:
       please pass this into the constructor

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -140,6 +144,7 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration
                 config.getBucket(), config.getRegion());
 
         blobStores.putIfAbsent(config.getBlobStoreLocation(), config.getBlobStore());
+        this.offloaderStats = LedgerOffloaderStats.getInstance();

Review comment:
       please pass this into the constructor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org