You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/15 07:00:45 UTC
[pulsar] branch master updated: Offloader metrics (#13833)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 732049fc6ca Offloader metrics (#13833)
732049fc6ca is described below
commit 732049fc6ca1beb046deb43057be2b130736fbca
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Fri Apr 15 15:00:38 2022 +0800
Offloader metrics (#13833)
### Motivation
Currently, there is no offload metrics for tiered storage, so it is very hard for us to debug the performance issues. For example , we can not find why offload is slow or why read offload is slow. For above reasons. we need to add some offload metrics for monitoring.
### Modifications
Add metrics during offload procedure and read offload data procedure. Including offloadTime, offloadError, offloadRate, readLedgerLatency, writeStoreLatency, writeStoreError, readOffloadIndexLatency, readOffloadDataLatency, readOffloadRate, readOffloadError.
---
.../bookkeeper/mledger/LedgerOffloaderFactory.java | 8 +-
.../bookkeeper/mledger/LedgerOffloaderStats.java | 62 ++++
.../mledger/LedgerOffloaderStatsDisable.java | 80 +++++
.../mledger/impl/LedgerOffloaderStatsImpl.java | 352 +++++++++++++++++++++
.../org/apache/pulsar/broker/PulsarService.java | 14 +-
.../prometheus/PrometheusMetricsGenerator.java | 7 +-
.../broker/stats/LedgerOffloaderMetricsTest.java | 187 +++++++++++
.../pulsar/sql/presto/PulsarConnectorCache.java | 17 +-
.../pulsar/sql/presto/PulsarConnectorConfig.java | 36 +++
.../FileSystemLedgerOffloaderFactory.java | 6 +-
.../impl/FileStoreBackedReadHandleImpl.java | 27 +-
.../impl/FileSystemManagedLedgerOffloader.java | 54 +++-
.../offload/filesystem/FileStoreTestBase.java | 6 +-
.../impl/FileSystemManagedLedgerOffloaderTest.java | 36 ++-
.../jcloud/JCloudLedgerOffloaderFactory.java | 6 +-
.../impl/BlobStoreBackedInputStreamImpl.java | 23 ++
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 15 +-
.../impl/BlobStoreBackedReadHandleImplV2.java | 12 +-
.../impl/BlobStoreManagedLedgerOffloader.java | 38 ++-
.../impl/BlockAwareSegmentInputStreamImpl.java | 22 +-
...obStoreManagedLedgerOffloaderStreamingTest.java | 7 +-
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 52 ++-
22 files changed, 996 insertions(+), 71 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
index c854996cd16..bb94cee6fde 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -52,7 +52,8 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
*/
T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
- OrderedScheduler scheduler)
+ OrderedScheduler scheduler,
+ LedgerOffloaderStats offloaderStats)
throws IOException;
/**
@@ -68,8 +69,9 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
default T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
SchemaStorage schemaStorage,
- OrderedScheduler scheduler)
+ OrderedScheduler scheduler,
+ LedgerOffloaderStats offloaderStats)
throws IOException {
- return create(offloadPolicies, userMetadata, scheduler);
+ return create(offloadPolicies, userMetadata, scheduler, offloaderStats);
}
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java
new file mode 100644
index 00000000000..c9330ff9e05
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
+
+
+/**
+ * Management Bean for a {@link LedgerOffloader}.
+ */
+@InterfaceAudience.LimitedPrivate
+@InterfaceStability.Stable
+public interface LedgerOffloaderStats extends AutoCloseable {
+
+ void recordOffloadError(String topic);
+
+ void recordOffloadBytes(String topic, long size);
+
+ void recordReadLedgerLatency(String topic, long latency, TimeUnit unit);
+
+ void recordWriteToStorageError(String topic);
+
+ void recordReadOffloadError(String topic);
+
+ void recordReadOffloadBytes(String topic, long size);
+
+ void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit);
+
+ void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit);
+
+ void recordDeleteOffloadOps(String topic, boolean succeed);
+
+
+ static LedgerOffloaderStats create(boolean exposeManagedLedgerStats, boolean exposeTopicLevelMetrics,
+ ScheduledExecutorService scheduler, int interval) {
+ if (!exposeManagedLedgerStats) {
+ return LedgerOffloaderStatsDisable.INSTANCE;
+ }
+
+ return LedgerOffloaderStatsImpl.getInstance(exposeTopicLevelMetrics, scheduler, interval);
+ }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java
new file mode 100644
index 00000000000..862d466067e
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import java.util.concurrent.TimeUnit;
+
+class LedgerOffloaderStatsDisable implements LedgerOffloaderStats {
+
+ static final LedgerOffloaderStats INSTANCE = new LedgerOffloaderStatsDisable();
+
+ private LedgerOffloaderStatsDisable() {
+
+ }
+
+ @Override
+ public void recordOffloadError(String topic) {
+
+ }
+
+ @Override
+ public void recordOffloadBytes(String topic, long size) {
+
+ }
+
+ @Override
+ public void recordReadLedgerLatency(String topic, long latency, TimeUnit unit) {
+
+ }
+
+ @Override
+ public void recordWriteToStorageError(String topic) {
+
+ }
+
+ @Override
+ public void recordReadOffloadError(String topic) {
+
+ }
+
+ @Override
+ public void recordReadOffloadBytes(String topic, long size) {
+
+ }
+
+ @Override
+ public void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit) {
+
+ }
+
+ @Override
+ public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit) {
+
+ }
+
+ @Override
+ public void recordDeleteOffloadOps(String topic, boolean succeed) {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
new file mode 100644
index 00000000000..1c21cd56445
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.naming.TopicName;
+
+public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Runnable {
+ private static final String TOPIC_LABEL = "topic";
+ private static final String NAMESPACE_LABEL = "namespace";
+ private static final String UNKNOWN = "unknown";
+ private static final String STATUS = "status";
+ private static final String SUCCEED = "succeed";
+ private static final String FAILED = "failed";
+
+ private final boolean exposeTopicLevelMetrics;
+ private final int interval;
+
+ private final Counter offloadError;
+ private final Gauge offloadRate;
+ private final Counter deleteOffloadOps;
+ private final Summary readLedgerLatency;
+ private final Counter writeStorageError;
+ private final Counter readOffloadError;
+ private final Gauge readOffloadRate;
+ private final Summary readOffloadIndexLatency;
+ private final Summary readOffloadDataLatency;
+
+ private final Map<String, Long> topicAccess;
+ private final Map<String, String> topic2Namespace;
+ private final Map<String, Pair<LongAdder, LongAdder>> offloadAndReadOffloadBytesMap;
+
+ final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private LedgerOffloaderStatsImpl(boolean exposeTopicLevelMetrics,
+ ScheduledExecutorService scheduler, int interval) {
+ this.interval = interval;
+ this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
+ if (null != scheduler) {
+ scheduler.scheduleAtFixedRate(this, interval, interval, TimeUnit.SECONDS);
+ }
+
+ this.topicAccess = new ConcurrentHashMap<>();
+ this.topic2Namespace = new ConcurrentHashMap<>();
+ this.offloadAndReadOffloadBytesMap = new ConcurrentHashMap<>();
+
+ String[] labels = exposeTopicLevelMetrics
+ ? new String[]{NAMESPACE_LABEL, TOPIC_LABEL} : new String[]{NAMESPACE_LABEL};
+
+ this.offloadError = Counter.build("brk_ledgeroffloader_offload_error", "-")
+ .labelNames(labels).create().register();
+ this.offloadRate = Gauge.build("brk_ledgeroffloader_offload_rate", "-")
+ .labelNames(labels).create().register();
+
+ this.readOffloadError = Counter.build("brk_ledgeroffloader_read_offload_error", "-")
+ .labelNames(labels).create().register();
+ this.readOffloadRate = Gauge.build("brk_ledgeroffloader_read_offload_rate", "-")
+ .labelNames(labels).create().register();
+ this.writeStorageError = Counter.build("brk_ledgeroffloader_write_storage_error", "-")
+ .labelNames(labels).create().register();
+
+ this.readOffloadIndexLatency = Summary.build("brk_ledgeroffloader_read_offload_index_latency", "-")
+ .labelNames(labels).create().register();
+ this.readOffloadDataLatency = Summary.build("brk_ledgeroffloader_read_offload_data_latency", "-")
+ .labelNames(labels).create().register();
+ this.readLedgerLatency = Summary.build("brk_ledgeroffloader_read_ledger_latency", "-")
+ .labelNames(labels).create().register();
+
+ String[] deleteOpsLabels = exposeTopicLevelMetrics
+ ? new String[]{NAMESPACE_LABEL, TOPIC_LABEL, STATUS} : new String[]{NAMESPACE_LABEL, STATUS};
+ this.deleteOffloadOps = Counter.build("brk_ledgeroffloader_delete_offload_ops", "-")
+ .labelNames(deleteOpsLabels).create().register();
+ }
+
+
+ private static LedgerOffloaderStats instance;
+ public static synchronized LedgerOffloaderStats getInstance(boolean exposeTopicLevelMetrics,
+ ScheduledExecutorService scheduler, int interval) {
+ if (null == instance) {
+ instance = new LedgerOffloaderStatsImpl(exposeTopicLevelMetrics, scheduler, interval);
+ }
+
+ return instance;
+ }
+
+ @Override
+ public void recordOffloadError(String topic) {
+ String[] labelValues = this.labelValues(topic);
+ this.offloadError.labels(labelValues).inc();
+ this.addOrUpdateTopicAccess(topic);
+ }
+
+ @Override
+ public void recordOffloadBytes(String topic, long size) {
+ topic = StringUtils.isBlank(topic) ? UNKNOWN : topic;
+ Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap
+ .computeIfAbsent(topic, __ -> new ImmutablePair<>(new LongAdder(), new LongAdder()));
+ pair.getLeft().add(size);
+ this.addOrUpdateTopicAccess(topic);
+ }
+
+ @Override
+ public void recordReadLedgerLatency(String topic, long latency, TimeUnit unit) {
+ String[] labelValues = this.labelValues(topic);
+ this.readLedgerLatency.labels(labelValues).observe(unit.toMicros(latency));
+ this.addOrUpdateTopicAccess(topic);
+ }
+
+ @Override
+ public void recordWriteToStorageError(String topic) {
+ String[] labelValues = this.labelValues(topic);
+ this.writeStorageError.labels(labelValues).inc();
+ this.addOrUpdateTopicAccess(topic);
+ }
+
+ @Override
+ public void recordReadOffloadError(String topic) {
+ String[] labelValues = this.labelValues(topic);
+ this.readOffloadError.labels(labelValues).inc();
+ this.addOrUpdateTopicAccess(topic);
+ }
+
+ @Override
+ public void recordReadOffloadBytes(String topic, long size) {
+ topic = StringUtils.isBlank(topic) ? UNKNOWN : topic;
+ Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap
+ .computeIfAbsent(topic, __ -> new ImmutablePair<>(new LongAdder(), new LongAdder()));
+ pair.getRight().add(size);
+ this.addOrUpdateTopicAccess(topic);
+ }
+
+ @Override
+ public void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit) {
+ String[] labelValues = this.labelValues(topic);
+ this.readOffloadIndexLatency.labels(labelValues).observe(unit.toMicros(latency));
+ this.addOrUpdateTopicAccess(topic);
+ }
+
+ @Override
+ public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit) {
+ String[] labelValues = this.labelValues(topic);
+ this.readOffloadDataLatency.labels(labelValues).observe(unit.toMicros(latency));
+ this.addOrUpdateTopicAccess(topic);
+ }
+
+ @Override
+ public void recordDeleteOffloadOps(String topic, boolean succeed) {
+ String status = succeed ? SUCCEED : FAILED;
+ String[] labelValues = this.labelValues(topic, status);
+ this.deleteOffloadOps.labels(labelValues).inc();
+ this.addOrUpdateTopicAccess(topic);
+ }
+
+ private void addOrUpdateTopicAccess(String topic) {
+ topic = StringUtils.isBlank(topic) ? UNKNOWN : topic;
+ this.topicAccess.put(topic, System.currentTimeMillis());
+ }
+
+ private String[] labelValues(String topic, String status) {
+ if (StringUtils.isBlank(topic)) {
+ return exposeTopicLevelMetrics ? new String[]{UNKNOWN, UNKNOWN, status} : new String[]{UNKNOWN, status};
+ }
+ String namespace = this.getNamespace(topic);
+ return this.exposeTopicLevelMetrics ? new String[]{namespace, topic, status} : new String[]{namespace, status};
+ }
+
+ private String[] labelValues(String topic) {
+ if (StringUtils.isBlank(topic)) {
+ return this.exposeTopicLevelMetrics ? new String[]{UNKNOWN, UNKNOWN} : new String[]{UNKNOWN};
+ }
+ String namespace = this.getNamespace(topic);
+ return this.exposeTopicLevelMetrics ? new String[]{namespace, topic} : new String[]{namespace};
+ }
+
+ private String getNamespace(String topic) {
+ return this.topic2Namespace.computeIfAbsent(topic, t -> {
+ try {
+ return TopicName.get(t).getNamespace();
+ } catch (IllegalArgumentException ex) {
+ return UNKNOWN;
+ }
+ });
+ }
+
+ private void cleanExpiredTopicMetrics() {
+ long now = System.currentTimeMillis();
+ long timeout = TimeUnit.MINUTES.toMillis(2);
+
+ topicAccess.entrySet().removeIf(entry -> {
+ String topic = entry.getKey();
+ long access = entry.getValue();
+
+ if (now - access >= timeout) {
+ this.topic2Namespace.remove(topic);
+ this.offloadAndReadOffloadBytesMap.remove(topic);
+ String[] labelValues = this.labelValues(topic);
+ this.offloadError.remove(labelValues);
+ this.offloadRate.remove(labelValues);
+ this.readLedgerLatency.remove(labelValues);
+ this.writeStorageError.remove(labelValues);
+ this.readOffloadError.remove(labelValues);
+ this.readOffloadRate.remove(labelValues);
+ this.readOffloadIndexLatency.remove(labelValues);
+ this.readOffloadDataLatency.remove(labelValues);
+
+ labelValues = this.labelValues(topic, SUCCEED);
+ this.deleteOffloadOps.remove(labelValues);
+ labelValues = this.labelValues(topic, FAILED);
+ this.deleteOffloadOps.remove(labelValues);
+
+ return true;
+ }
+ return false;
+ });
+ }
+
+ @Override
+ public void run() {
+ this.cleanExpiredTopicMetrics();
+
+ this.offloadAndReadOffloadBytesMap.forEach((topic, pair) -> {
+ String[] labelValues = this.labelValues(topic);
+
+ double interval = this.interval;
+ long offloadBytes = pair.getLeft().sumThenReset();
+ long readOffloadBytes = pair.getRight().sumThenReset();
+
+ this.offloadRate.labels(labelValues).set(offloadBytes / interval);
+ this.readOffloadRate.labels(labelValues).set(readOffloadBytes / interval);
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (instance == this && this.closed.compareAndSet(false, true)) {
+ CollectorRegistry.defaultRegistry.unregister(this.offloadError);
+ CollectorRegistry.defaultRegistry.unregister(this.offloadRate);
+ CollectorRegistry.defaultRegistry.unregister(this.readLedgerLatency);
+ CollectorRegistry.defaultRegistry.unregister(this.writeStorageError);
+ CollectorRegistry.defaultRegistry.unregister(this.readOffloadError);
+ CollectorRegistry.defaultRegistry.unregister(this.readOffloadRate);
+ CollectorRegistry.defaultRegistry.unregister(this.readOffloadIndexLatency);
+ CollectorRegistry.defaultRegistry.unregister(this.readOffloadDataLatency);
+ this.topic2Namespace.clear();
+ this.offloadAndReadOffloadBytesMap.clear();
+ }
+ }
+
+ @VisibleForTesting
+ public long getOffloadBytes(String topic) {
+ if (this.exposeTopicLevelMetrics) {
+ Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap.get(topic);
+ return pair.getLeft().sum();
+ }
+
+ String namespace = this.topic2Namespace.get(topic);
+ List<String> topics = this.offloadAndReadOffloadBytesMap.keySet().stream()
+ .filter(topicName -> topicName.contains(namespace)).collect(Collectors.toList());
+
+ long totalBytes = 0;
+ for (String key : topics) {
+ totalBytes += this.offloadAndReadOffloadBytesMap.get(key).getLeft().sum();
+ }
+ return totalBytes;
+ }
+
+ @VisibleForTesting
+ public long getOffloadError(String topic) {
+ String[] labels = this.labelValues(topic);
+ return (long) this.offloadError.labels(labels).get();
+ }
+
+ @VisibleForTesting
+ public long getWriteStorageError(String topic) {
+ String[] labels = this.labelValues(topic);
+ return (long) this.writeStorageError.labels(labels).get();
+ }
+
+ @VisibleForTesting
+ public long getReadOffloadError(String topic) {
+ String[] labels = this.labelValues(topic);
+ return (long) this.readOffloadError.labels(labels).get();
+ }
+
+ @VisibleForTesting
+ public long getReadOffloadBytes(String topic) {
+ if (this.exposeTopicLevelMetrics) {
+ Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap.get(topic);
+ return pair.getRight().sum();
+ }
+
+ String namespace = this.topic2Namespace.get(topic);
+ List<String> topics = this.offloadAndReadOffloadBytesMap.keySet().stream()
+ .filter(topicName -> topicName.contains(namespace)).collect(Collectors.toList());
+
+ long totalBytes = 0;
+ for (String key : topics) {
+ totalBytes += this.offloadAndReadOffloadBytesMap.get(key).getRight().sum();
+ }
+ return totalBytes;
+ }
+
+ @VisibleForTesting
+ public Summary.Child.Value getReadLedgerLatency(String topic) {
+ String[] labels = this.labelValues(topic);
+ return this.readLedgerLatency.labels(labels).get();
+ }
+
+ @VisibleForTesting
+ public Summary.Child.Value getReadOffloadIndexLatency(String topic) {
+ String[] labels = this.labelValues(topic);
+ return this.readOffloadIndexLatency.labels(labels).get();
+ }
+
+ @VisibleForTesting
+ public Summary.Child.Value getReadOffloadDataLatency(String topic) {
+ String[] labels = this.labelValues(topic);
+ return this.readOffloadDataLatency.labels(labels).get();
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 76b0a624037..35cd22d4bbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -73,6 +73,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.Offloaders;
@@ -204,6 +205,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private OrderedScheduler offloaderScheduler;
private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
+ private final LedgerOffloaderStats offloaderStats;
private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
@@ -336,6 +338,11 @@ public class PulsarService implements AutoCloseable, ShutdownService {
new ExecutorProvider(1, "broker-client-shared-external-executor");
this.brokerClientSharedTimer =
new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
+
+ int interval = config.getManagedLedgerStatsPeriodSeconds();
+ boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();
+ this.offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
+ exposeTopicMetrics, this.getOffloaderScheduler(), interval);
}
public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
@@ -530,6 +537,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
if (transactionExecutorProvider != null) {
transactionExecutorProvider.shutdownNow();
}
+ if (this.offloaderStats != null) {
+ this.offloaderStats.close();
+ }
brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
@@ -1259,6 +1269,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
offloadPolicies.getManagedLedgerOffloadDriver());
+
Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
@@ -1272,8 +1283,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha(),
LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName()
),
- schemaStorage,
- getOffloaderScheduler(offloadPolicies));
+ schemaStorage, getOffloaderScheduler(offloadPolicies), this.offloaderStats);
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index e0235817f0c..bf2884e2b3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.PulsarVersion;
@@ -53,6 +54,7 @@ import org.apache.pulsar.common.util.SimpleTextOutputStream;
* in a text format suitable to be consumed by Prometheus.
* Format specification can be found at <a href="https://prometheus.io/docs/instrumenting/exposition_formats/">Exposition Formats</a>
*/
+@Slf4j
public class PrometheusMetricsGenerator {
static {
@@ -199,13 +201,16 @@ public class PrometheusMetricsGenerator {
.write("{cluster=\"").write(cluster).write('"');
}
+ //to avoid quantile label duplicated
+ boolean appendedQuantile = false;
for (Map.Entry<String, String> metric : metrics1.getDimensions().entrySet()) {
if (metric.getKey().isEmpty() || "cluster".equals(metric.getKey())) {
continue;
}
stream.write(", ").write(metric.getKey()).write("=\"").write(metric.getValue()).write('"');
- if (value != null && !value.isEmpty()) {
+ if (value != null && !value.isEmpty() && !appendedQuantile) {
stream.write(", ").write("quantile=\"").write(value).write('"');
+ appendedQuantile = true;
}
}
stream.write("} ").write(String.valueOf(entry.getValue()))
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
new file mode 100644
index 00000000000..c6c8f07b868
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+public class LedgerOffloaderMetricsTest extends BrokerTestBase {
+
+ @Override
+ protected void setup() throws Exception {
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testTopicLevelMetrics() throws Exception {
+ conf.setExposeTopicLevelMetricsInPrometheus(true);
+ super.baseSetup();
+
+ String ns1 = "prop/ns-abc1";
+ admin.namespaces().createNamespace(ns1);
+ String []topics = new String[3];
+
+ LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) pulsar.getOffloaderStats();
+
+ LedgerOffloader offloader = Mockito.mock(LedgerOffloader.class);
+ Topic topic = Mockito.mock(PersistentTopic.class);
+ CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
+ Optional<Topic> topicOptional = Optional.of(topic);
+ topicFuture.complete(topicOptional);
+ BrokerService brokerService = spy(pulsar.getBrokerService());
+ doReturn(brokerService).when(pulsar).getBrokerService();
+
+
+ for (int i = 0; i < 3; i++) {
+ String topicName = "persistent://prop/ns-abc1/testMetrics" + UUID.randomUUID();
+ topics[i] = topicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ doReturn(topicFuture).when(brokerService).getTopicIfExists(topicName);
+ Assert.assertTrue(topic instanceof PersistentTopic);
+
+ ManagedLedger ledgerM = Mockito.mock(ManagedLedger.class);
+ doReturn(ledgerM).when(((PersistentTopic) topic)).getManagedLedger();
+ ManagedLedgerConfig config = Mockito.mock(ManagedLedgerConfig.class);
+ doReturn(config).when(ledgerM).getConfig();
+ doReturn(offloader).when(config).getLedgerOffloader();
+
+ offloaderStats.recordOffloadError(topicName);
+ offloaderStats.recordOffloadError(topicName);
+ offloaderStats.recordOffloadBytes(topicName, 100);
+ offloaderStats.recordReadLedgerLatency(topicName, 1000, TimeUnit.NANOSECONDS);
+ offloaderStats.recordReadOffloadError(topicName);
+ offloaderStats.recordReadOffloadError(topicName);
+ offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS);
+ offloaderStats.recordReadOffloadBytes(topicName, 100000);
+ offloaderStats.recordWriteToStorageError(topicName);
+ offloaderStats.recordWriteToStorageError(topicName);
+ }
+
+ for (String topicName : topics) {
+ Assert.assertEquals(offloaderStats.getOffloadError(topicName), 2);
+ Assert.assertEquals(offloaderStats.getOffloadBytes(topicName) , 100);
+ Assert.assertEquals((long) offloaderStats.getReadLedgerLatency(topicName).sum, 1);
+ Assert.assertEquals(offloaderStats.getReadOffloadError(topicName), 2);
+ Assert.assertEquals((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum ,1000);
+ Assert.assertEquals(offloaderStats.getReadOffloadBytes(topicName), 100000);
+ Assert.assertEquals(offloaderStats.getWriteStorageError(topicName), 2);
+ }
+ }
+
+ @Test
+ public void testNamespaceLevelMetrics() throws Exception {
+ conf.setExposeTopicLevelMetricsInPrometheus(false);
+ super.baseSetup();
+
+ String ns1 = "prop/ns-abc1";
+ String ns2 = "prop/ns-abc2";
+
+ LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) pulsar.getOffloaderStats();
+
+ LedgerOffloader offloader = Mockito.mock(LedgerOffloader.class);
+ Topic topic = Mockito.mock(PersistentTopic.class);
+ CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
+ Optional<Topic> topicOptional = Optional.of(topic);
+ topicFuture.complete(topicOptional);
+ BrokerService brokerService = spy(pulsar.getBrokerService());
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ Queue<String> queue = new LinkedList<>();
+ Map<String, List<String>> namespace2Topics = new HashMap<>();
+ for (int s = 0; s < 2; s++) {
+ String nameSpace = ns1;
+ if (s == 1) {
+ nameSpace = ns2;
+ }
+ namespace2Topics.put(nameSpace, new ArrayList<>());
+
+ admin.namespaces().createNamespace(nameSpace);
+ String baseTopic1 = "persistent://" + nameSpace + "/testMetrics";
+ for (int i = 0; i < 6; i++) {
+ String topicName = baseTopic1 + UUID.randomUUID();
+ List<String> topicList = namespace2Topics.get(nameSpace);
+ topicList.add(topicName);
+
+ queue.add(topicName);
+ admin.topics().createNonPartitionedTopic(topicName);
+ doReturn(topicFuture).when(brokerService).getTopicIfExists(topicName);
+ Assert.assertTrue(topic instanceof PersistentTopic);
+
+
+ ManagedLedger ledgerM = Mockito.mock(ManagedLedger.class);
+ doReturn(ledgerM).when(((PersistentTopic) topic)).getManagedLedger();
+ ManagedLedgerConfig config = Mockito.mock(ManagedLedgerConfig.class);
+ doReturn(config).when(ledgerM).getConfig();
+ doReturn(offloader).when(config).getLedgerOffloader();
+ Mockito.when(ledgerM.getName()).thenAnswer((Answer<String>) invocation -> queue.poll());
+
+ offloaderStats.recordOffloadError(topicName);
+ offloaderStats.recordOffloadBytes(topicName, 100);
+ offloaderStats.recordReadLedgerLatency(topicName, 1000, TimeUnit.NANOSECONDS);
+ offloaderStats.recordReadOffloadError(topicName);
+ offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS);
+ offloaderStats.recordReadOffloadBytes(topicName, 100000);
+ offloaderStats.recordWriteToStorageError(topicName);
+ }
+ }
+
+ for (Map.Entry<String, List<String>> entry : namespace2Topics.entrySet()) {
+ String namespace = entry.getKey();
+ List<String> topics = entry.getValue();
+ String topicName = topics.get(0);
+
+ Assert.assertTrue(offloaderStats.getOffloadError(topicName) >= 1);
+ Assert.assertTrue(offloaderStats.getOffloadBytes(topicName) >= 100);
+ Assert.assertTrue((long) offloaderStats.getReadLedgerLatency(topicName).sum >= 1);
+ Assert.assertTrue(offloaderStats.getReadOffloadError(topicName) >= 1);
+ Assert.assertTrue((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum >= 1000);
+ Assert.assertTrue(offloaderStats.getReadOffloadBytes(topicName) >= 100000);
+ Assert.assertTrue(offloaderStats.getWriteStorageError(topicName) >= 1);
+ }
+ }
+
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 9a64c055d07..f6a4771f9e4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
@@ -61,6 +62,7 @@ public class PulsarConnectorCache {
private final StatsProvider statsProvider;
private OrderedScheduler offloaderScheduler;
+ private final LedgerOffloaderStats offloaderStats;
private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
@@ -84,6 +86,14 @@ public class PulsarConnectorCache {
this.statsProvider.start(clientConfiguration);
+ this.initOffloaderScheduler(pulsarConnectorConfig.getOffloadPolices());
+
+ int period = pulsarConnectorConfig.getManagedLedgerStatsPeriodSeconds();
+ boolean exposeTopicLevelMetrics = pulsarConnectorConfig.isExposeTopicLevelMetricsInPrometheus();
+ this.offloaderStats =
+ LedgerOffloaderStats.create(pulsarConnectorConfig.isExposeManagedLedgerMetricsInPrometheus(),
+ exposeTopicLevelMetrics, offloaderScheduler, period);
+
this.defaultOffloader = initManagedLedgerOffloader(
pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig);
}
@@ -142,13 +152,10 @@ public class PulsarConnectorCache {
return managedLedgerConfig;
}
- private synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
- if (this.offloaderScheduler == null) {
+ private void initOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
.name("pulsar-offloader").build();
- }
- return this.offloaderScheduler;
}
private LedgerOffloader initManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies,
@@ -171,7 +178,7 @@ public class PulsarConnectorCache {
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
- getOffloaderScheduler(offloadPolicies));
+ this.offloaderScheduler, this.offloaderStats);
} catch (IOException ioe) {
log.error("Failed to create offloader: ", ioe);
throw new RuntimeException(ioe.getMessage(), ioe.getCause());
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index abaf9a6fbce..0e5c2b4e95f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -69,6 +69,11 @@ public class PulsarConnectorConfig implements AutoCloseable {
private String offloadersDirectory = "./offloaders";
private Map<String, String> offloaderProperties = new HashMap<>();
+ //--- Ledger metrics ---
+ private boolean exposeTopicLevelMetricsInPrometheus = false;
+ private boolean exposeManagedLedgerMetricsInPrometheus = false;
+ private int managedLedgerStatsPeriodSeconds = 60;
+
private PulsarAdmin pulsarAdmin;
// --- Bookkeeper
@@ -277,6 +282,37 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
+ @Config("pulsar.expose-topic-level-metrics-in-prometheus")
+ public PulsarConnectorConfig setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) {
+ this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus;
+ return this;
+ }
+
+ public boolean isExposeTopicLevelMetricsInPrometheus() {
+ return exposeTopicLevelMetricsInPrometheus;
+ }
+
+ @Config("pulsar.expose-managed-ledger-metrics-in-prometheus")
+ public PulsarConnectorConfig setExposeManagedLedgerMetricsInPrometheus(
+ boolean exposeManagedLedgerMetricsInPrometheus) {
+ this.exposeManagedLedgerMetricsInPrometheus = exposeManagedLedgerMetricsInPrometheus;
+ return this;
+ }
+
+ public boolean isExposeManagedLedgerMetricsInPrometheus() {
+ return exposeManagedLedgerMetricsInPrometheus;
+ }
+
+ @Config("pulsar.managed-ledger-stats-period-seconds")
+ public PulsarConnectorConfig setManagedLedgerStatsPeriodSeconds(int managedLedgerStatsPeriodSeconds) {
+ this.managedLedgerStatsPeriodSeconds = managedLedgerStatsPeriodSeconds;
+ return this;
+ }
+
+ public int getManagedLedgerStatsPeriodSeconds() {
+ return managedLedgerStatsPeriodSeconds;
+ }
+
// --- Authentication ---
public String getAuthPlugin() {
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
index 4b32f77bc1f..c7876f9941a 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -34,7 +35,8 @@ public class FileSystemLedgerOffloaderFactory implements LedgerOffloaderFactory<
@Override
public FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
- OrderedScheduler scheduler) throws IOException {
- return FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler);
+ OrderedScheduler scheduler,
+ LedgerOffloaderStats offloaderStats) throws IOException {
+ return FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler, offloaderStats);
}
}
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 5221b15d5d9..ecc0cfc23b3 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -35,6 +36,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
@@ -47,18 +49,25 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
private final MapFile.Reader reader;
private final long ledgerId;
private final LedgerMetadata ledgerMetadata;
+ private final LedgerOffloaderStats offloaderStats;
+ private final String managedLedgerName;
- private FileStoreBackedReadHandleImpl(ExecutorService executor,
- MapFile.Reader reader,
- long ledgerId) throws IOException {
+ private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId,
+ LedgerOffloaderStats offloaderStats,
+ String managedLedgerName) throws IOException {
this.ledgerId = ledgerId;
this.executor = executor;
this.reader = reader;
+ this.offloaderStats = offloaderStats;
+ this.managedLedgerName = managedLedgerName;
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
try {
key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
+ long startReadIndexTime = System.nanoTime();
reader.get(key, value);
+ offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+ System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS);
this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes());
} catch (IOException e) {
log.error("Fail to read LedgerMetadata for ledgerId {}",
@@ -113,7 +122,10 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
key.set(nextExpectedId - 1);
reader.seek(key);
while (entriesToRead > 0) {
+ long startReadTime = System.nanoTime();
reader.next(key, value);
+ this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+ System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
int length = value.getLength();
long entryId = key.get();
if (entryId == nextExpectedId) {
@@ -122,6 +134,7 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
buf.writeBytes(value.copyBytes());
entriesToRead--;
nextExpectedId++;
+ this.offloaderStats.recordReadOffloadBytes(managedLedgerName, length);
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
@@ -130,6 +143,7 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
}
promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
+ this.offloaderStats.recordReadOffloadError(managedLedgerName);
promise.completeExceptionally(t);
entries.forEach(LedgerEntry::close);
}
@@ -176,9 +190,8 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
return promise;
}
- public static ReadHandle open(ScheduledExecutorService executor,
- MapFile.Reader reader,
- long ledgerId) throws IOException {
- return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId);
+ public static ReadHandle open(ScheduledExecutorService executor, MapFile.Reader reader, long ledgerId,
+ LedgerOffloaderStats offloaderStats, String managedLedgerName) throws IOException {
+ return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId, offloaderStats, managedLedgerName);
}
}
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index 02f1e62426a..d91d84b476b 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -29,12 +29,14 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -62,6 +64,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
private static final long ENTRIES_PER_READ = 100;
private OrderedScheduler assignmentScheduler;
private OffloadPoliciesImpl offloadPolicies;
+ private final LedgerOffloaderStats offloaderStats;
public static boolean driverSupported(String driver) {
return DRIVER_NAMES.equals(driver);
@@ -73,11 +76,13 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
}
public static FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl conf,
- OrderedScheduler scheduler) throws IOException {
- return new FileSystemManagedLedgerOffloader(conf, scheduler);
+ OrderedScheduler scheduler,
+ LedgerOffloaderStats offloaderStats) throws IOException {
+ return new FileSystemManagedLedgerOffloader(conf, scheduler, offloaderStats);
}
- private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler) throws IOException {
+ private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler,
+ LedgerOffloaderStats offloaderStats) throws IOException {
this.offloadPolicies = conf;
this.configuration = new Configuration();
if (conf.getFileSystemProfilePath() != null) {
@@ -105,13 +110,15 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(conf.getManagedLedgerOffloadMaxThreads())
.name("offload-assignment").build();
+ this.offloaderStats = offloaderStats;
}
@VisibleForTesting
public FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf,
OrderedScheduler scheduler,
String testHDFSPath,
- String baseDir) throws IOException {
+ String baseDir,
+ LedgerOffloaderStats offloaderStats) throws IOException {
this.offloadPolicies = conf;
this.configuration = new Configuration();
this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
@@ -125,6 +132,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(conf.getManagedLedgerOffloadMaxThreads())
.name("offload-assignment").build();
+ this.offloaderStats = offloaderStats;
}
@Override
@@ -142,8 +150,9 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(readHandle.getId()).submit(
- new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath,
- configuration, assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds()));
+ new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration,
+ assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds(),
+ this.offloaderStats));
return promise;
}
@@ -158,6 +167,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
volatile Exception fileSystemWriteException = null;
private OrderedScheduler assignmentScheduler;
private int managedLedgerOffloadPrefetchRounds = 1;
+ private final LedgerOffloaderStats offloaderStats;
private LedgerReader(ReadHandle readHandle,
UUID uuid,
@@ -166,7 +176,8 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
String storageBasePath,
Configuration configuration,
OrderedScheduler assignmentScheduler,
- int managedLedgerOffloadPrefetchRounds) {
+ int managedLedgerOffloadPrefetchRounds,
+ LedgerOffloaderStats offloaderStats) {
this.readHandle = readHandle;
this.uuid = uuid;
this.extraMetadata = extraMetadata;
@@ -175,6 +186,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
this.configuration = configuration;
this.assignmentScheduler = assignmentScheduler;
this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds;
+ this.offloaderStats = offloaderStats;
}
@Override
@@ -185,7 +197,8 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
return;
}
long ledgerId = readHandle.getId();
- String storagePath = getStoragePath(storageBasePath, extraMetadata.get(MANAGED_LEDGER_NAME));
+ final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME);
+ String storagePath = getStoragePath(storageBasePath, topicName);
String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
@@ -208,7 +221,10 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1,
readHandle.getLastAddConfirmed());
log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end);
+ long startReadTime = System.nanoTime();
LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
+ long cost = System.nanoTime() - startReadTime;
+ this.offloaderStats.recordReadLedgerLatency(topicName, cost, TimeUnit.NANOSECONDS);
semaphore.acquire();
countDownLatch = new CountDownLatch(1);
assignmentScheduler.chooseThread(ledgerId)
@@ -225,10 +241,11 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
promise.complete(null);
} catch (Exception e) {
log.error("Exception when get CompletableFuture<LedgerEntries> : ManagerLedgerName: {}, "
- + "LedgerId: {}, UUID: {} ", extraMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, e);
+ + "LedgerId: {}, UUID: {} ", topicName, ledgerId, uuid, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
+ this.offloaderStats.recordOffloadError(topicName);
promise.completeExceptionally(e);
}
}
@@ -288,6 +305,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
@Override
public void run() {
+ String managedLedgerName = ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME);
if (ledgerReader.fileSystemWriteException == null) {
Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();
while (iterator.hasNext()) {
@@ -299,9 +317,11 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
dataWriter.append(key, value);
} catch (IOException e) {
ledgerReader.fileSystemWriteException = e;
+ ledgerReader.offloaderStats.recordWriteToStorageError(managedLedgerName);
break;
}
haveOffloadEntryNumber.incrementAndGet();
+ ledgerReader.offloaderStats.recordOffloadBytes(managedLedgerName, entry.getLength());
}
}
countDownLatch.countDown();
@@ -315,19 +335,19 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
Map<String, String> offloadDriverMetadata) {
+ final String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME);
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
- String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
+ String storagePath = getStoragePath(storageBasePath, ledgerName);
String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
scheduler.chooseThread(ledgerId).submit(() -> {
try {
MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),
configuration);
- promise.complete(FileStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
- reader,
- ledgerId));
+ promise.complete(FileStoreBackedReadHandleImpl.open(
+ scheduler.chooseThread(ledgerId), reader, ledgerId, this.offloaderStats, ledgerName));
} catch (Throwable t) {
log.error("Failed to open FileStoreBackedReadHandleImpl: ManagerLedgerName: {}, "
- + "LegerId: {}, UUID: {}", offloadDriverMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, t);
+ + "LegerId: {}, UUID: {}", ledgerName, ledgerId, uuid, t);
promise.completeExceptionally(t);
}
});
@@ -344,7 +364,8 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
- String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
+ String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME);
+ String storagePath = getStoragePath(storageBasePath, ledgerName);
String dataFilePath = getDataFilePath(storagePath, ledgerId, uid);
CompletableFuture<Void> promise = new CompletableFuture<>();
try {
@@ -354,7 +375,8 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
log.error("Failed to delete Offloaded: ", e);
promise.completeExceptionally(e);
}
- return promise;
+ return promise.whenComplete((__, t) ->
+ this.offloaderStats.recordDeleteOffloadOps(ledgerName, t == null));
}
@Override
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
index 342bce601e8..b3a02a4f7f6 100644
--- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.offload.filesystem;
import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -30,6 +31,7 @@ import org.testng.annotations.BeforeMethod;
import java.io.File;
import java.nio.file.Files;
import java.util.Properties;
+import java.util.concurrent.Executors;
public abstract class FileStoreTestBase {
protected FileSystemManagedLedgerOffloader fileSystemManagedLedgerOffloader;
@@ -37,6 +39,7 @@ public abstract class FileStoreTestBase {
protected final String basePath = "pulsar";
private MiniDFSCluster hdfsCluster;
private String hdfsURI;
+ protected LedgerOffloaderStats offloaderStats;
@BeforeMethod(alwaysRun = true)
public void start() throws Exception {
@@ -48,9 +51,10 @@ public abstract class FileStoreTestBase {
hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort() + "/";
Properties properties = new Properties();
+ this.offloaderStats = LedgerOffloaderStats.create(true, true, Executors.newScheduledThreadPool(1), 60);
fileSystemManagedLedgerOffloader = new FileSystemManagedLedgerOffloader(
OffloadPoliciesImpl.create(properties),
- scheduler, hdfsURI, basePath);
+ scheduler, hdfsURI, basePath, offloaderStats);
}
@AfterMethod(alwaysRun = true)
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index 3600e9e5c57..3f9dbb35551 100644
--- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -27,12 +27,13 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import org.apache.bookkeeper.mledger.offload.filesystem.FileStoreTestBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
@@ -45,7 +46,7 @@ import static org.testng.Assert.assertTrue;
public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
private final PulsarMockBookKeeper bk;
- private String topic = "public/default/persistent/testOffload";
+ private String topic = "public/default/testOffload";
private String storagePath = createStoragePath(topic);
private LedgerHandle lh;
private ReadHandle toWrite;
@@ -76,6 +77,12 @@ public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
.withPassword("foobar".getBytes()).withDigestType(DigestType.CRC32).execute().get();
}
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ public void start() throws Exception {
+ super.start();
+ }
+
@Test
public void testOffloadAndRead() throws Exception {
LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
@@ -111,6 +118,31 @@ public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
}
}
+ @Test
+ public void testOffloadAndReadMetrics() throws Exception {
+ LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
+ UUID uuid = UUID.randomUUID();
+ offloader.offload(toWrite, uuid, map).get();
+
+ LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) this.offloaderStats;
+ assertTrue(offloaderStats.getOffloadError(topic) == 0);
+ assertTrue(offloaderStats.getOffloadBytes(topic) > 0);
+ assertTrue(offloaderStats.getReadLedgerLatency(topic).count > 0);
+ assertTrue(offloaderStats.getWriteStorageError(topic) == 0);
+
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get();
+ LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
+ Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
+ while (toTestIter.hasNext()) {
+ LedgerEntry toTestEntry = toTestIter.next();
+ }
+
+ assertTrue(offloaderStats.getReadOffloadError(topic) == 0);
+ assertTrue(offloaderStats.getReadOffloadBytes(topic) > 0);
+ assertTrue(offloaderStats.getReadOffloadDataLatency(topic).count > 0);
+ assertTrue(offloaderStats.getReadOffloadIndexLatency(topic).count > 0);
+ }
+
@Test
public void testDeleteOffload() throws Exception {
LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
index c74799ebac0..c7293696c2f 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
@@ -45,10 +46,11 @@ public class JCloudLedgerOffloaderFactory implements LedgerOffloaderFactory<Blob
@Override
public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies, Map<String, String> userMetadata,
- OrderedScheduler scheduler) throws IOException {
+ OrderedScheduler scheduler,
+ LedgerOffloaderStats offloaderStats) throws IOException {
TieredStorageConfiguration config =
TieredStorageConfiguration.create(offloadPolicies.toProperties());
- return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler);
+ return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats);
}
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index e905884ddd0..61d8d0606f3 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -21,6 +21,8 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -40,6 +42,8 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
private final ByteBuf buffer;
private final long objectLen;
private final int bufferSize;
+ private LedgerOffloaderStats offloaderStats;
+ private String managedLedgerName;
private long cursor;
private long bufferOffsetStart;
@@ -59,6 +63,16 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
this.bufferOffsetStart = this.bufferOffsetEnd = -1;
}
+
+ public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String key,
+ VersionCheck versionCheck,
+ long objectLen, int bufferSize,
+ LedgerOffloaderStats offloaderStats, String managedLedgerName) {
+ this(blobStore, bucket, key, versionCheck, objectLen, bufferSize);
+ this.offloaderStats = offloaderStats;
+ this.managedLedgerName = managedLedgerName;
+ }
+
/**
* Refill the buffered input if it is empty.
* @return true if there are bytes to read, false otherwise
@@ -73,7 +87,13 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
objectLen - 1);
try {
+ long startReadTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange));
+ if (this.offloaderStats != null) {
+ this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+ System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
+ this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - startRange + 1);
+ }
versionCheck.check(key, blob);
try (InputStream stream = blob.getPayload().openStream()) {
@@ -88,6 +108,9 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
cursor += buffer.readableBytes();
}
} catch (Throwable e) {
+ if (null != this.offloaderStats) {
+ this.offloaderStats.recordReadOffloadError(this.managedLedgerName);
+ }
throw new IOException("Error reading from BlobStore", e);
}
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index e129a11fe43..998912a30c4 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -35,6 +36,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
@@ -62,8 +64,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
private State state = null;
private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
- BackedInputStream inputStream,
- ExecutorService executor) {
+ BackedInputStream inputStream, ExecutorService executor) {
this.ledgerId = ledgerId;
this.index = index;
this.inputStream = inputStream;
@@ -222,7 +223,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
public static ReadHandle open(ScheduledExecutorService executor,
BlobStore blobStore, String bucket, String key, String indexKey,
VersionCheck versionCheck,
- long ledgerId, int readBufferSize)
+ long ledgerId, int readBufferSize,
+ LedgerOffloaderStats offloaderStats, String managedLedgerName)
throws IOException {
int retryCount = 3;
OffloadIndexBlock index = null;
@@ -233,7 +235,10 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
// If we use a backoff to control the retry, it will introduce a concurrent operation.
// We don't want to make it complicated, because in the most of case it shouldn't in the retry loop.
while (retryCount-- > 0) {
+ long readIndexStartTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, indexKey);
+ offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+ System.nanoTime() - readIndexStartTime, TimeUnit.NANOSECONDS);
versionCheck.check(indexKey, blob);
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
try (InputStream payLoadStream = blob.getPayload().openStream()) {
@@ -253,9 +258,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
}
BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
- versionCheck,
- index.getDataObjectLength(),
- readBufferSize);
+ versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);
return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor);
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index a6f66132c4a..e1a8296a20e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import lombok.val;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -38,6 +39,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
@@ -274,7 +276,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
public static ReadHandle open(ScheduledExecutorService executor,
BlobStore blobStore, String bucket, List<String> keys, List<String> indexKeys,
VersionCheck versionCheck,
- long ledgerId, int readBufferSize)
+ long ledgerId, int readBufferSize, LedgerOffloaderStats offloaderStats,
+ String managedLedgerName)
throws IOException {
List<BackedInputStream> inputStreams = new LinkedList<>();
List<OffloadIndexBlockV2> indice = new LinkedList<>();
@@ -282,7 +285,10 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
String indexKey = indexKeys.get(i);
String key = keys.get(i);
log.debug("open bucket: {} index key: {}", bucket, indexKey);
+ long startTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, indexKey);
+ offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+ System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
log.debug("indexKey blob: {} {}", indexKey, blob);
versionCheck.check(indexKey, blob);
OffloadIndexBlockV2Builder indexBuilder = OffloadIndexBlockV2Builder.create();
@@ -292,9 +298,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
}
BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
- versionCheck,
- index.getDataObjectLength(),
- readBufferSize);
+ versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);
inputStreams.add(inputStream);
indice.add(index);
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 9aa7ecde660..1fdc0983b0e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle.OfferEntryResult;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
@@ -91,6 +92,8 @@ import org.jclouds.io.payloads.InputStreamPayload;
@Slf4j
public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
+ private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
+
private final OrderedScheduler scheduler;
private final TieredStorageConfiguration config;
private final Location writeLocation;
@@ -113,16 +116,18 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
private final int streamingBlockSize;
private volatile ManagedLedger ml;
private OffloadIndexBlockV2Builder streamingIndexBuilder;
+ private final LedgerOffloaderStats offloaderStats;
public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config,
Map<String, String> userMetadata,
- OrderedScheduler scheduler) throws IOException {
+ OrderedScheduler scheduler,
+ LedgerOffloaderStats offloaderStats) throws IOException {
- return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata);
+ return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats);
}
BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler,
- Map<String, String> userMetadata) {
+ Map<String, String> userMetadata, LedgerOffloaderStats offloaderStats) {
this.scheduler = scheduler;
this.userMetadata = userMetadata;
@@ -149,6 +154,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
config.getBucket(), config.getRegion());
blobStores.putIfAbsent(config.getBlobStoreLocation(), config.getBlobStore());
+ this.offloaderStats = offloaderStats;
log.info("The ledger offloader was created.");
}
@@ -170,6 +176,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
public CompletableFuture<Void> offload(ReadHandle readHandle,
UUID uuid,
Map<String, String> extraMetadata) {
+ final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME);
final BlobStore writeBlobStore = blobStores.get(config.getBlobStoreLocation());
log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata,
config.getBlobStoreLocation(), writeBlobStore);
@@ -212,13 +219,14 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
try {
long startEntry = 0;
int partId = 1;
+ long start = System.nanoTime();
long entryBytesWritten = 0;
while (startEntry <= readHandle.getLastAddConfirmed()) {
int blockSize = BlockAwareSegmentInputStreamImpl
.calculateBlockSize(config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten);
try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl(
- readHandle, startEntry, blockSize)) {
+ readHandle, startEntry, blockSize, this.offloaderStats, topicName)) {
Payload partPayload = Payloads.newInputStreamPayload(blockStream);
partPayload.getContentMetadata().setContentLength((long) blockSize);
@@ -237,6 +245,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
}
entryBytesWritten += blockStream.getBlockEntryBytesCount();
partId++;
+ this.offloaderStats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount());
}
dataObjectLength += blockSize;
@@ -254,6 +263,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.",
config.getBucket(), dataBlockKey, mpu.id(), throwable);
}
+ this.offloaderStats.recordWriteToStorageError(topicName);
+ this.offloaderStats.recordOffloadError(topicName);
promise.completeExceptionally(t);
return;
}
@@ -277,7 +288,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
.payload(indexPayload)
.contentLength((long) indexStream.getStreamSize())
.build();
-
writeBlobStore.putBlob(config.getBucket(), blob);
promise.complete(null);
} catch (Throwable t) {
@@ -287,6 +297,9 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
log.error("Failed deleteObject in bucket - {} with key - {}.",
config.getBucket(), dataBlockKey, throwable);
}
+
+ this.offloaderStats.recordWriteToStorageError(topicName);
+ this.offloaderStats.recordOffloadError(topicName);
promise.completeExceptionally(t);
return;
}
@@ -532,7 +545,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
readBlobstore,
readBucket, key, indexKey,
DataBlockUtils.VERSION_CHECK,
- ledgerId, config.getReadBufferSizeInBytes()));
+ ledgerId, config.getReadBufferSizeInBytes(),
+ this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME)));
} catch (Throwable t) {
log.error("Failed readOffloaded: ", t);
promise.completeExceptionally(t);
@@ -565,7 +579,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
readBlobstore,
readBucket, keys, indexKeys,
DataBlockUtils.VERSION_CHECK,
- ledgerId, config.getReadBufferSizeInBytes()));
+ ledgerId, config.getReadBufferSizeInBytes(),
+ this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME)));
} catch (Throwable t) {
log.error("Failed readOffloaded: ", t);
promise.completeExceptionally(t);
@@ -594,7 +609,11 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
}
});
- return promise;
+ return promise.whenComplete((__, t) -> {
+ if (null != this.ml) {
+ this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(), t == null);
+ }
+ });
}
@Override
@@ -616,7 +635,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
}
});
- return promise;
+ return promise.whenComplete((__, t) ->
+ this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(), t == null));
}
@Override
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index a4ffdea6509..da1f92438f0 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -27,9 +27,11 @@ import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.slf4j.Logger;
@@ -65,6 +67,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content.
private List<ByteBuf> entriesByteBuf = null;
+ private LedgerOffloaderStats offloaderStats;
+ private String topicName;
public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) {
this.ledger = ledger;
@@ -76,6 +80,13 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
this.entriesByteBuf = Lists.newLinkedList();
}
+ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize,
+ LedgerOffloaderStats offloaderStats, String ledgerName) {
+ this(ledger, startEntryId, blockSize);
+ this.offloaderStats = offloaderStats;
+ this.topicName = ledgerName;
+ }
+
// read ledger entries.
private int readEntries() throws IOException {
checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
@@ -113,11 +124,18 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntries) throws IOException {
long end = Math.min(start + maxNumberEntries - 1, ledger.getLastAddConfirmed());
+ long startTime = System.nanoTime();
try (LedgerEntries ledgerEntriesOnce = ledger.readAsync(start, end).get()) {
- log.debug("read ledger entries. start: {}, end: {}", start, end);
+ if (log.isDebugEnabled()) {
+ log.debug("read ledger entries. start: {}, end: {} cost {}", start, end,
+ TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime));
+ }
+ if (offloaderStats != null && topicName != null) {
+ offloaderStats.recordReadLedgerLatency(topicName, System.nanoTime() - startTime,
+ TimeUnit.NANOSECONDS);
+ }
List<ByteBuf> entries = Lists.newLinkedList();
-
Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();
while (iterator.hasNext()) {
LedgerEntry entry = iterator.next();
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
index 32ad4980f46..952e758f0e0 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
@@ -52,6 +53,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag
private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderStreamingTest.class);
private TieredStorageConfiguration mockedConfig;
private static final Random random = new Random();
+ private final LedgerOffloaderStats offloaderStats;
BlobStoreManagedLedgerOffloaderStreamingTest() throws Exception {
super();
@@ -60,6 +62,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag
assertNotNull(provider);
provider.validate(config);
blobStore = provider.getBlobStore(config);
+ this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
}
private BlobStoreManagedLedgerOffloader getOffloader(Map<String, String> additionalConfig) throws IOException {
@@ -76,7 +79,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
- .create(mockedConfig, new HashMap<String, String>(), scheduler);
+ .create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats);
return offloader;
}
@@ -85,7 +88,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
- .create(mockedConfig, new HashMap<String, String>(), scheduler);
+ .create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats);
return offloader;
}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index c607e79ddc2..74d03b11dba 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -23,10 +23,9 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
-
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -38,12 +37,14 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.Executors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
@@ -60,6 +61,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class);
private TieredStorageConfiguration mockedConfig;
+ private final LedgerOffloaderStats offloaderStats;
BlobStoreManagedLedgerOffloaderTest() throws Exception {
super();
@@ -68,6 +70,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
assertNotNull(provider);
provider.validate(config);
blobStore = provider.getBlobStore(config);
+ this.offloaderStats = LedgerOffloaderStats.create(true, true, Executors.newScheduledThreadPool(1), 60);
}
private BlobStoreManagedLedgerOffloader getOffloader() throws IOException {
@@ -81,14 +84,14 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws IOException {
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
- BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler);
+ BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats);
return offloader;
}
private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mockedBlobStore) throws IOException {
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
- BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler);
+ BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats);
return offloader;
}
@@ -162,6 +165,41 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
Assert.assertEquals(toTest.getState(), BlobStoreBackedReadHandleImpl.State.Closed);
}
+ @Test(timeOut = 600000) // 10 minutes.
+ public void testOffloadAndReadMetrics() throws Exception {
+ ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
+ LedgerOffloader offloader = getOffloader();
+
+ UUID uuid = UUID.randomUUID();
+
+ String topic = "test";
+ Map<String, String> extraMap = new HashMap<>();
+ extraMap.put("ManagedLedgerName", topic);
+ offloader.offload(toWrite, uuid, extraMap).get();
+
+ LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) this.offloaderStats;
+
+ assertEquals(offloaderStats.getOffloadError(topic), 0);
+ assertTrue(offloaderStats.getOffloadBytes(topic) > 0 );
+ assertTrue(offloaderStats.getReadLedgerLatency(topic).count > 0);
+ assertEquals(offloaderStats.getWriteStorageError(topic), 0);
+
+ Map<String, String> map = new HashMap<>();
+ map.putAll(offloader.getOffloadDriverMetadata());
+ map.put("ManagedLedgerName", topic);
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get();
+ LedgerEntries toTestEntries = toTest.read(0, toTest.getLastAddConfirmed());
+ Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
+ while (toTestIter.hasNext()) {
+ LedgerEntry toTestEntry = toTestIter.next();
+ }
+
+ assertEquals(offloaderStats.getReadOffloadError(topic), 0);
+ assertTrue(offloaderStats.getReadOffloadBytes(topic) > 0);
+ assertTrue(offloaderStats.getReadOffloadDataLatency(topic).count > 0);
+ assertTrue(offloaderStats.getReadOffloadIndexLatency(topic).count > 0);
+ }
+
@Test
public void testOffloadFailInitDataBlockUpload() throws Exception {
ReadHandle readHandle = buildReadHandle();
@@ -497,7 +535,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
try {
toTest.readAsync(0, 0).get();
} catch (Exception e) {
- fail("Get unexpected exception when reading entries", e);
+ Assert.fail("Get unexpected exception when reading entries", e);
}
}