You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/03/29 14:38:49 UTC
[pulsar] branch branch-2.8 updated: TieredStorage: add debug information (#14907)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 7ee8113 TieredStorage: add debug information (#14907)
7ee8113 is described below
commit 7ee8113cbae59e55e184393f283e05cb6a42f5c2
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Mar 29 09:47:56 2022 +0200
TieredStorage: add debug information (#14907)
* TieredStorage: add debug information
- enable SLF4 logging in JClouds
- add Pulsar Cluster name in Objects metadata
- log object names during offloading
(cherry picked from commit 83dad0ac855ca81983ff4bf8f5676fe98a55d0ff)
---
jclouds-shaded/pom.xml | 5 +++++
.../org/apache/bookkeeper/mledger/LedgerOffloader.java | 1 +
.../java/org/apache/pulsar/broker/PulsarService.java | 3 ++-
.../jcloud/impl/BlobStoreManagedLedgerOffloader.java | 16 ++++++++++++++--
.../offload/jcloud/provider/JCloudBlobStoreProvider.java | 6 ++++++
5 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
index 536ab9b..e81f3b6 100644
--- a/jclouds-shaded/pom.xml
+++ b/jclouds-shaded/pom.xml
@@ -40,6 +40,11 @@
<version>${jclouds.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.jclouds.driver</groupId>
+ <artifactId>jclouds-slf4j</artifactId>
+ <version>${jclouds.version}</version>
+ </dependency>
+ <dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index ead9db7..4718801 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -90,6 +90,7 @@ public interface LedgerOffloader {
// TODO: improve the user metadata in subsequent changes
String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
+ String METADATA_PULSAR_CLUSTER_NAME = "pulsarClusterName";
/**
* Get offload driver name.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 06430aa..504b53f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1237,7 +1237,8 @@ public class PulsarService implements AutoCloseable {
offloadPolicies,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
- LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
+ LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha(),
+ LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName()
),
schemaStorage,
getOffloaderScheduler(offloadPolicies));
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index f9c86fd..5fc2bf7 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Duration;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -173,6 +174,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
.withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
String dataBlockKey = DataBlockUtils.dataBlockOffloadKey(readHandle.getId(), uuid);
String indexBlockKey = DataBlockUtils.indexBlockOffloadKey(readHandle.getId(), uuid);
+ log.info("ledger {} dataBlockKey {} indexBlockKey {}", readHandle.getId(), dataBlockKey, indexBlockKey);
MultipartUpload mpu = null;
List<MultipartPart> parts = Lists.newArrayList();
@@ -180,7 +182,12 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
// init multi part upload for data block.
try {
BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey);
- DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Map<String, String> objectMetadata = new HashMap<>(userMetadata);
+ objectMetadata.put("role", "data");
+ if (extraMetadata != null) {
+ objectMetadata.putAll(extraMetadata);
+ }
+ DataBlockUtils.addVersionInfo(blobBuilder, objectMetadata);
Blob blob = blobBuilder.build();
mpu = writeBlobStore.initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions());
} catch (Throwable t) {
@@ -243,7 +250,12 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
IndexInputStream indexStream = index.toStream()) {
// write the index block
BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey);
- DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Map<String, String> objectMetadata = new HashMap<>(userMetadata);
+ objectMetadata.put("role", "index");
+ if (extraMetadata != null) {
+ objectMetadata.putAll(extraMetadata);
+ }
+ DataBlockUtils.addVersionInfo(blobBuilder, objectMetadata);
Payload indexPayload = Payloads.newInputStreamPayload(indexStream);
indexPayload.getContentMetadata().setContentLength((long) indexStream.getStreamSize());
indexPayload.getContentMetadata().setContentType("application/octet-stream");
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 49dabb2..44aa92c 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -38,6 +38,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
+import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;
@@ -61,6 +62,7 @@ import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
import org.jclouds.googlecloud.GoogleCredentialsFromJson;
import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata;
+import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
import org.jclouds.providers.AnonymousProviderMetadata;
import org.jclouds.providers.ProviderMetadata;
import org.jclouds.s3.S3ApiMetadata;
@@ -140,6 +142,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
+ contextBuilder.modules(Arrays.asList(new SLF4JLoggingModule()));
contextBuilder.overrides(config.getOverrides());
if (config.getProviderCredentials() != null) {
@@ -205,6 +208,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
public BlobStore getBlobStore(TieredStorageConfiguration config) {
ContextBuilder builder = ContextBuilder.newBuilder("transient");
+ builder.modules(Arrays.asList(new SLF4JLoggingModule()));
BlobStoreContext ctx = builder
.buildView(BlobStoreContext.class);
@@ -287,6 +291,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
static final BlobStoreBuilder BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
+ contextBuilder.modules(Arrays.asList(new SLF4JLoggingModule()));
contextBuilder.overrides(config.getOverrides());
if (StringUtils.isNotEmpty(config.getServiceEndpoint())) {
@@ -371,6 +376,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
+ contextBuilder.modules(Arrays.asList(new SLF4JLoggingModule()));
Properties overrides = config.getOverrides();
// For security reasons, OSS supports only virtual hosted style access.
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");