You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 03:13:07 UTC
[pulsar] 02/02: [Tiered Storage] Fix merge conflicts introduced by
PR #6335 (#8630)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8793963ccdd8ce778a681c139b5ab4fab7b820a2
Author: ran <ga...@126.com>
AuthorDate: Sat Nov 21 09:22:18 2020 +0800
[Tiered Storage] Fix merge conflicts introduced by PR #6335 (#8630)
# Motivation
The PR #6335 lost some PR changes, related PRs as below.
1. PR 4196 (2019/5/29 Merli)
Configure static PulsarByteBufAllocator to handle OOM errors (#4196)
2. PR 5356 (2019/10/30 Kelly)
[TIEREDSTORAGE] Only seek when reading unexpected entry (#5356)
3. PR 4433 (2019/6/4 Higham)
[tiered-storage] Add support for AWS instance and role creds (#4433)
(cherry picked from commit 68759ff405bca60343ddb8318cc10f3727981a5d)
---
.../impl/BlobStoreBackedInputStreamImpl.java | 7 +--
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 20 +++++----
.../impl/BlobStoreManagedLedgerOffloader.java | 7 ---
.../impl/BlockAwareSegmentInputStreamImpl.java | 6 +--
.../offload/jcloud/impl/DataBlockHeaderImpl.java | 9 +++-
.../offload/jcloud/impl/OffloadIndexBlockImpl.java | 4 +-
.../offload/jcloud/impl/OffloadIndexEntryImpl.java | 6 +++
.../jcloud/provider/JCloudBlobStoreProvider.java | 50 ++++++++++++++++------
.../provider/TieredStorageConfiguration.java | 11 +++--
.../impl/BlobStoreManagedLedgerOffloaderBase.java | 7 +--
10 files changed, 83 insertions(+), 44 deletions(-)
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index 386196c..6a204d5 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -19,11 +19,11 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.options.GetOptions;
@@ -52,7 +52,7 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
this.bucket = bucket;
this.key = key;
this.versionCheck = versionCheck;
- this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
+ this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
this.objectLen = objectLen;
this.bufferSize = bufferSize;
this.cursor = 0;
@@ -116,7 +116,8 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
@Override
public void seek(long position) {
- log.debug("Seeking to {} on {}/{}, current position {}", position, bucket, key, cursor);
+ log.debug("Seeking to {} on {}/{}, current position {} (bufStart:{}, bufEnd:{})",
+ position, bucket, key, cursor, bufferOffsetStart, bufferOffsetEnd);
if (position >= bufferOffsetStart && position <= bufferOffsetEnd) {
long newIndex = position - bufferOffsetStart;
buffer.readerIndex((int) newIndex);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 92f2514..b48751e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,8 +37,8 @@ import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
-import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
@@ -104,19 +103,16 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
long nextExpectedId = firstEntry;
try {
- OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry);
- inputStream.seek(entry.getDataOffset());
-
while (entriesToRead > 0) {
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
- inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
- length = dataStream.readInt();
+ inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+ continue;
}
long entryId = dataStream.readLong();
if (entryId == nextExpectedId) {
- ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
int toWrite = length;
while (toWrite > 0) {
@@ -124,6 +120,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
}
entriesToRead--;
nextExpectedId++;
+ } else if (entryId > nextExpectedId) {
+ inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+ continue;
+ } else if (entryId < nextExpectedId
+ && !index.getIndexEntryForEntry(nextExpectedId).equals(
+ index.getIndexEntryForEntry(entryId))) {
+ inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+ continue;
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 6643a83..4f51713 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -67,7 +67,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
private final OrderedScheduler scheduler;
private final TieredStorageConfiguration config;
-// private final BlobStore writeBlobStore;
private final Location writeLocation;
// metadata to be stored as part of the offloaded ledger metadata
@@ -116,11 +115,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
return config.getOffloadDriverMetadata();
}
-// @VisibleForTesting
-// public ConcurrentMap<BlobStoreLocation, BlobStore> getBlobStores() {
-// return blobStores;
-// }
-
/**
* Upload the DataBlocks associated with the given ReadHandle using MultiPartUpload,
* Creating indexBlocks for each corresponding DataBlock that is uploaded.
@@ -303,7 +297,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
@Override
public OffloadPolicies getOffloadPolicies() {
- // TODO Auto-generated method stub
Properties properties = new Properties();
properties.putAll(config.getConfigProperties());
return OffloadPolicies.create(properties);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index dc7a68a..dcc693a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
@@ -33,6 +32,7 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,8 +126,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
int entryLength = buf.readableBytes();
long entryId = entry.getEntryId();
- CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(2);
- ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
+ CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
+ ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
entryBuf.addComponents(true, entryHeaderBuf, buf);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
index 9e3fe90..9239ec2 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
@@ -28,6 +28,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
/**
* The data block header in code storage for each data block.
@@ -110,7 +111,7 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
*/
@Override
public InputStream toStream() {
- ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
+ ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
out.writeInt(MAGIC_WORD)
.writeLong(headerLength)
.writeLong(blockLength)
@@ -120,5 +121,11 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
// true means the input stream will release the ByteBuf on close
return new ByteBufInputStream(out, true);
}
+
+ @Override
+ public String toString() {
+ return String.format("DataBlockHeader(len:%d,hlen:%d,firstEntry:%d)",
+ blockLength, headerLength, firstEntryId);
+ }
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 0a4e90b..b34dce5 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
@@ -42,6 +41,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -156,7 +156,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
+ segmentMetadataLength
+ indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + blockOffset */
- ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
+ ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
out.writeInt(INDEX_MAGIC_WORD)
.writeInt(indexBlockLength)
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
index 5dce79d..10408fe 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
@@ -56,5 +56,11 @@ public class OffloadIndexEntryImpl implements OffloadIndexEntry {
this.offset = offset;
this.blockHeaderSize = blockHeaderSize;
}
+
+ @Override
+ public String toString() {
+ return String.format("[eid:%d, part:%d, offset:%d, doffset:%d]",
+ entryId, partId, offset, getDataOffset());
+ }
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index ba03f64..df4b0d3 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -18,8 +18,14 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud.provider;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.GCS_ACCOUNT_KEY_FILE_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;
+
import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.google.common.base.Strings;
import com.google.common.io.Files;
@@ -36,8 +42,8 @@ import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfig
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.CredentialBuilder;
import org.apache.commons.lang3.StringUtils;
-import org.jclouds.Constants;
import org.jclouds.ContextBuilder;
+import org.jclouds.aws.domain.SessionCredentials;
import org.jclouds.aws.s3.AWSS3ProviderMetadata;
import org.jclouds.azureblob.AzureBlobProviderMetadata;
import org.jclouds.blobstore.BlobStore;
@@ -106,9 +112,8 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
if (config.getCredentials() == null) {
try {
String gcsKeyContent = Files.toString(
- new File(config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile")),
- Charset.defaultCharset());
- config.setProviderCredentials(new GoogleCredentialsFromJson(gcsKeyContent).get());
+ new File(config.getConfigProperty(GCS_ACCOUNT_KEY_FILE_FIELD)), Charset.defaultCharset());
+ config.setProviderCredentials(() -> new GoogleCredentialsFromJson(gcsKeyContent).get());
} catch (IOException ioe) {
log.error("Cannot read GCS service account credentials file: {}",
config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile"));
@@ -139,7 +144,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
if (StringUtils.isEmpty(accountKey)) {
throw new IllegalArgumentException("Couldn't get the azure storage access key.");
}
- config.setProviderCredentials(new Credentials(accountName, accountKey));
+ config.setProviderCredentials(() -> new Credentials(accountName, accountKey));
}
},
@@ -246,8 +251,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
if (config.getProviderCredentials() != null) {
return contextBuilder
- .credentials(config.getProviderCredentials().identity,
- config.getProviderCredentials().credential)
+ .credentialsSupplier(config.getCredentials())
.buildView(BlobStoreContext.class)
.getBlobStore();
} else {
@@ -262,17 +266,35 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
if (config.getCredentials() == null) {
AWSCredentials awsCredentials = null;
try {
- DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance();
- awsCredentials = creds.getCredentials();
+ if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
+ awsCredentials = DefaultAWSCredentialsProviderChain.getInstance().getCredentials();
+ } else {
+ awsCredentials =
+ new STSAssumeRoleSessionCredentialsProvider.Builder(
+ config.getConfigProperty(S3_ROLE_FIELD),
+ config.getConfigProperty(S3_ROLE_SESSION_NAME_FIELD)
+ ).build().getCredentials();
+ }
+
+ if (awsCredentials instanceof AWSSessionCredentials) {
+ // if we have session credentials, we need to send the session token
+ // this allows us to support EC2 metadata credentials
+ SessionCredentials sessionCredentials = SessionCredentials.builder()
+ .accessKeyId(awsCredentials.getAWSAccessKeyId())
+ .secretAccessKey(awsCredentials.getAWSSecretKey())
+ .sessionToken(((AWSSessionCredentials) awsCredentials).getSessionToken())
+ .build();
+ config.setProviderCredentials(() -> sessionCredentials);
+ } else {
+ Credentials credentials = new Credentials(
+ awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey());
+ config.setProviderCredentials(() -> credentials);
+ }
+
} catch (Exception e) {
// allowed, some mock s3 service do not need credential
log.warn("Exception when get credentials for s3 ", e);
}
- if (awsCredentials != null) {
- config.setProviderCredentials(
- new Credentials(awsCredentials.getAWSAccessKeyId(),
- awsCredentials.getAWSSecretKey()));
- }
}
};
}
\ No newline at end of file
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index 5abd867..ac2bed9 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
import org.jclouds.Constants;
import org.jclouds.aws.s3.AWSS3ProviderMetadata;
import org.jclouds.blobstore.BlobStore;
@@ -69,6 +70,10 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
protected static final int MB = 1024 * 1024;
+ public static final String GCS_ACCOUNT_KEY_FILE_FIELD = "gcsManagedLedgerOffloadServiceAccountKeyFile";
+ public static final String S3_ROLE_FIELD = "s3ManagedLedgerOffloadRole";
+ public static final String S3_ROLE_SESSION_NAME_FIELD = "s3ManagedLedgerOffloadRoleSessionName";
+
public static TieredStorageConfiguration create(Properties props) throws IOException {
Map<String, String> map = new HashMap<String, String>();
map.putAll(props.entrySet()
@@ -86,7 +91,7 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
@Getter
private final Map<String, String> configProperties;
@Getter
- private Credentials credentials;
+ private Supplier<Credentials> credentials;
private JCloudBlobStoreProvider provider;
public TieredStorageConfiguration(Map<String, String> configProperties) {
@@ -221,14 +226,14 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
return new Integer(MB);
}
- public Credentials getProviderCredentials() {
+ public Supplier<Credentials> getProviderCredentials() {
if (credentials == null) {
getProvider().buildCredentials(this);
}
return credentials;
}
- public void setProviderCredentials(Credentials credentials) {
+ public void setProviderCredentials(Supplier<Credentials> credentials) {
this.credentials = credentials;
}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
index ea613e7..a2944f5 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProv
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.data.ACL;
@@ -99,14 +100,14 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
* Get the credentials to use for the JCloud provider
* based on the System properties.
*/
- protected static Credentials getBlobStoreCredentials() {
+ protected static Supplier<Credentials> getBlobStoreCredentials() {
if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) {
/* To use this, must config credentials using "aws_access_key_id" as S3ID,
* and "aws_secret_access_key" as S3Key. And bucket should exist in default region. e.g.
* props.setProperty("S3ID", "AXXXXXXQ");
* props.setProperty("S3Key", "HXXXXXĂ");
*/
- return new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));
+ return () -> new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));
} else if (Boolean.parseBoolean(System.getProperty("testRealGCS", "false"))) {
/*
@@ -115,7 +116,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
* props.setProperty("GCSID", "5XXXXXXXXXX6-compute@developer.gserviceaccount.com");
* props.setProperty("GCSKey", "XXXXXX");
*/
- return new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey"));
+ return () -> new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey"));
} else {
return null;
}