You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/16 09:15:30 UTC

[GitHub] [pulsar] hangc0276 commented on a change in pull request #11555: [Offload] Add offload performance metrics for tiered storage.

hangc0276 commented on a change in pull request #11555:
URL: https://github.com/apache/pulsar/pull/11555#discussion_r689339248



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
##########
@@ -172,6 +172,9 @@
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer managedLedgerOffloadReadBufferSizeInBytes;
+    @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+    private Integer refreshStatsInterval = 60;

Review comment:
       We'd better turn off the offload metrics by default. And suggest user to set `refreshStatsInterval` to `-1` or `60`.
   Because each topic offloader will generate a `LedgerOffloaderMXBeanImpl` instance, which will use a single thread to refresh stats. If all topic offloaders are turn on this metric generator, it will lead to CPU usage increase.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderMXBeanImpl.java
##########
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloaderMXBean;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.stats.Rate;
+
+public class LedgerOffloaderMXBeanImpl implements LedgerOffloaderMXBean {
+
+    public static final long[] READ_ENTRY_LATENCY_BUCKETS_USEC = {500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
+            200_000, 1000_000};
+
+    private static final long OUT_DATED_CLEAN_PERIOD_MILLISECOND = 10 * 60 * 1000;
+
+    private final String driverName;
+
+    private final ScheduledExecutorService scheduler;
+    private long refreshIntervalSeconds;
+    private Map<String, Long> topicAccessTime;
+
+    // offloadTimeMap record the time cost by one round offload
+    private final ConcurrentHashMap<String, Rate> offloadTimeMap = new ConcurrentHashMap<>();
+    // offloadErrorMap record error ocurred
+    private final ConcurrentHashMap<String, Rate> offloadErrorMap = new ConcurrentHashMap<>();
+    // offloadRateMap record the offload rate
+    private final ConcurrentHashMap<String, Rate> offloadRateMap = new ConcurrentHashMap<>();
+
+
+    // readLedgerLatencyBucketsMap record the time cost by ledger read
+    private final ConcurrentHashMap<String, StatsBuckets> readLedgerLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageLatencyBucketsMap record the time cost by write to storage
+    private final ConcurrentHashMap<String, StatsBuckets> writeToStorageLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageErrorMap record the error occurred in write storage
+    private final ConcurrentHashMap<String, Rate> writeToStorageErrorMap = new ConcurrentHashMap<>();
+
+
+    // streamingWriteToStorageRateMap and streamingWriteToStorageErrorMap is for streamingOffload
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageErrorMap = new ConcurrentHashMap<>();
+
+    // readOffloadIndexLatencyBucketsMap and readOffloadDataLatencyBucketsMap are latency metrics about index and data
+    // readOffloadDataRateMap and readOffloadErrorMap is for reading offloaded data
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadIndexLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadDataLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadDataRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadErrorMap = new ConcurrentHashMap<>();
+
+    public LedgerOffloaderMXBeanImpl(String driverName, long refreshIntervalSeconds) {
+        this.driverName = driverName;
+        this.refreshIntervalSeconds = refreshIntervalSeconds;

Review comment:
       We should check `refreshIntervalSeconds > 0` to determine  whether to new a single thread to refresh stats.  If we new a lot of thread to run noop, it will waste cpu resource.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderMXBeanImpl.java
##########
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloaderMXBean;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.stats.Rate;
+
+public class LedgerOffloaderMXBeanImpl implements LedgerOffloaderMXBean {
+
+    public static final long[] READ_ENTRY_LATENCY_BUCKETS_USEC = {500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
+            200_000, 1000_000};
+
+    private static final long OUT_DATED_CLEAN_PERIOD_MILLISECOND = 10 * 60 * 1000;
+
+    private final String driverName;
+
+    private final ScheduledExecutorService scheduler;
+    private long refreshIntervalSeconds;
+    private Map<String, Long> topicAccessTime;
+
+    // offloadTimeMap record the time cost by one round offload
+    private final ConcurrentHashMap<String, Rate> offloadTimeMap = new ConcurrentHashMap<>();
+    // offloadErrorMap record error ocurred
+    private final ConcurrentHashMap<String, Rate> offloadErrorMap = new ConcurrentHashMap<>();
+    // offloadRateMap record the offload rate
+    private final ConcurrentHashMap<String, Rate> offloadRateMap = new ConcurrentHashMap<>();
+
+
+    // readLedgerLatencyBucketsMap record the time cost by ledger read
+    private final ConcurrentHashMap<String, StatsBuckets> readLedgerLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageLatencyBucketsMap record the time cost by write to storage
+    private final ConcurrentHashMap<String, StatsBuckets> writeToStorageLatencyBucketsMap = new ConcurrentHashMap<>();
+    // writeToStorageErrorMap record the error occurred in write storage
+    private final ConcurrentHashMap<String, Rate> writeToStorageErrorMap = new ConcurrentHashMap<>();
+
+
+    // streamingWriteToStorageRateMap and streamingWriteToStorageErrorMap is for streamingOffload
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> streamingWriteToStorageErrorMap = new ConcurrentHashMap<>();
+
+    // readOffloadIndexLatencyBucketsMap and readOffloadDataLatencyBucketsMap are latency metrics about index and data
+    // readOffloadDataRateMap and readOffloadErrorMap is for reading offloaded data
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadIndexLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, StatsBuckets> readOffloadDataLatencyBucketsMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadDataRateMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Rate> readOffloadErrorMap = new ConcurrentHashMap<>();
+
+    public LedgerOffloaderMXBeanImpl(String driverName, long refreshIntervalSeconds) {
+        this.driverName = driverName;
+        this.refreshIntervalSeconds = refreshIntervalSeconds;
+        this.scheduler = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("ledger-offloader-metrics"));

Review comment:
       We'd better to add driverName to the scheduler thread name.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
##########
@@ -67,6 +68,11 @@ public OffloadPoliciesImpl getOffloadPolicies() {
         return null;
     }
 
+    @Override
+    public LedgerOffloaderMXBean getStats() {
+        return new LedgerOffloaderMXBeanImpl("NullLedgerOffloader", 60);

Review comment:
       set `refreshIntervalSeconds = -1` to disable `NullLedgerOffloader` MXBean generator  by default?

##########
File path: tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
##########
@@ -211,13 +229,15 @@ public void run() {
                     throw fileSystemWriteException;
                 }
                 IOUtils.closeStream(dataWriter);
+                mbean.recordOffloadTime(extraMetadata.get(MANAGED_LEDGER_NAME), (System.nanoTime() - start), TimeUnit.NANOSECONDS);

Review comment:
       This metric just include entry read latency from BookKeeper in one round offload, because entry write to FileSystem is async.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org