You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/21 07:54:30 UTC
[incubator-pulsar] branch master updated: Store offloaded data
object size in index (#1810)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 79b0e28 Store offloaded data object size in index (#1810)
79b0e28 is described below
commit 79b0e28d15e1e8c3fcdb4b29aee0956a3a31ebc8
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon May 21 09:54:28 2018 +0200
Store offloaded data object size in index (#1810)
We need the size of the data object to set a bound on the stream we
read from S3. Without the size in the index we need to do an extra
call to S3 which is undesirable.
Master Issue: #1511
---
.../apache/pulsar/broker/s3offload/OffloadIndexBlock.java | 6 +++++-
.../pulsar/broker/s3offload/OffloadIndexBlockBuilder.java | 8 +++++++-
.../pulsar/broker/s3offload/S3ManagedLedgerOffloader.java | 7 +++++--
.../s3offload/impl/OffloadIndexBlockBuilderImpl.java | 12 ++++++++++--
.../broker/s3offload/impl/OffloadIndexBlockImpl.java | 14 +++++++++++++-
.../broker/s3offload/impl/S3BackedReadHandleImpl.java | 3 +--
.../pulsar/broker/s3offload/impl/OffloadIndexTest.java | 4 +++-
7 files changed, 44 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
index 8f9d3ce..944edca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -36,7 +36,7 @@ public interface OffloadIndexBlock extends Closeable {
* Get the content of the index block as InputStream.
* Read out in format:
* | index_magic_header | index_block_len | index_entry_count |
- * |segment_metadata_length | segment metadata | index entries |
+ * | data_object_size | segment_metadata_length | segment metadata | index entries ... |
*/
InputStream toStream() throws IOException;
@@ -59,5 +59,9 @@ public interface OffloadIndexBlock extends Closeable {
*/
LedgerMetadata getLedgerMetadata();
+ /**
+ * Get the total size of the data object.
+ */
+ long getDataObjectLength();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
index 8ec0395..c60ce88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
@@ -37,7 +37,7 @@ public interface OffloadIndexBlockBuilder {
*
* @param metadata the ledger metadata
*/
- OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata);
+ OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata);
/**
* Add one payload block related information into index block.
@@ -52,6 +52,12 @@ public interface OffloadIndexBlockBuilder {
OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize);
/**
+ * Specify the length of data object this index is associated with.
+ * @param dataObjectLength the length of the data object
+ */
+ OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength);
+
+ /**
* Finalize the immutable OffloadIndexBlock
*/
OffloadIndexBlock build();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 7a73a3b..276488d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -110,7 +110,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.submit(() -> {
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
- .withMetadata(readHandle.getLedgerMetadata());
+ .withLedgerMetadata(readHandle.getLedgerMetadata());
String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid);
InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey);
@@ -124,6 +124,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
return;
}
+ long dataObjectLength = 0;
// start multi part upload for data block.
try {
long startEntry = 0;
@@ -157,6 +158,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
entryBytesWritten += blockStream.getBlockEntryBytesCount();
partId++;
}
+
+ dataObjectLength += blockSize;
}
s3client.completeMultipartUpload(new CompleteMultipartUploadRequest()
@@ -171,7 +174,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
}
// upload index block
- try (OffloadIndexBlock index = indexBuilder.build();
+ try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build();
InputStream indexStream = index.toStream()) {
// write the index block
ObjectMetadata metadata = new ObjectMetadata();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
index 766083d..3b0f899 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
private LedgerMetadata ledgerMetadata;
+ private long dataObjectLength;
private List<OffloadIndexEntryImpl> entries;
private int lastBlockSize;
@@ -42,7 +43,13 @@ public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
}
@Override
- public OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata) {
+ public OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength) {
+ this.dataObjectLength = dataObjectLength;
+ return this;
+ }
+
+ @Override
+ public OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata) {
this.ledgerMetadata = metadata;
return this;
}
@@ -73,7 +80,8 @@ public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
public OffloadIndexBlock build() {
checkState(ledgerMetadata != null);
checkState(!entries.isEmpty());
- return OffloadIndexBlockImpl.get(ledgerMetadata, entries);
+ checkState(dataObjectLength > 0);
+ return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, entries);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
index f43c503..638edc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
@@ -54,6 +54,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
private static final int INDEX_MAGIC_WORD = 0xDE47DE47;
private LedgerMetadata segmentMetadata;
+ private long dataObjectLength;
private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
private final Handle<OffloadIndexBlockImpl> recyclerHandle;
@@ -69,12 +70,14 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
this.recyclerHandle = recyclerHandle;
}
- public static OffloadIndexBlockImpl get(LedgerMetadata metadata, List<OffloadIndexEntryImpl> entries) {
+ public static OffloadIndexBlockImpl get(LedgerMetadata metadata, long dataObjectLength,
+ List<OffloadIndexEntryImpl> entries) {
OffloadIndexBlockImpl block = RECYCLER.get();
block.indexEntries = Maps.newTreeMap();
entries.forEach(entry -> block.indexEntries.putIfAbsent(entry.getEntryId(), entry));
checkState(entries.size() == block.indexEntries.size());
block.segmentMetadata = metadata;
+ block.dataObjectLength = dataObjectLength;
return block;
}
@@ -86,6 +89,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
}
public void recycle() {
+ dataObjectLength = -1;
segmentMetadata = null;
indexEntries.clear();
indexEntries = null;
@@ -116,6 +120,11 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
return this.segmentMetadata;
}
+ @Override
+ public long getDataObjectLength() {
+ return this.dataObjectLength;
+ }
+
private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
builder.setQuorumSize(metadata.getWriteQuorumSize())
@@ -159,6 +168,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
indexBlockLength = 4 /* magic header */
+ 4 /* index block length */
+ + 8 /* data object length */
+ 4 /* segment metadata length */
+ 4 /* index entry count */
+ segmentMetadataLength
@@ -168,6 +178,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
out.writeInt(INDEX_MAGIC_WORD)
.writeInt(indexBlockLength)
+ .writeLong(dataObjectLength)
.writeInt(segmentMetadataLength)
.writeInt(indexEntryCount);
@@ -306,6 +317,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
magic, INDEX_MAGIC_WORD));
}
int indexBlockLength = dis.readInt();
+ this.dataObjectLength = dis.readLong();
int segmentMetadataLength = dis.readInt();
int indexEntryCount = dis.readInt();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
index 037ea67..984af59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
@@ -197,9 +197,8 @@ public class S3BackedReadHandleImpl implements ReadHandle {
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent());
- ObjectMetadata dataMetadata = s3client.getObjectMetadata(bucket, key); // FIXME: this should be part of index
S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key,
- dataMetadata.getContentLength(),
+ index.getDataObjectLength(),
readBufferSize);
return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
index 916c4cd..af9d6e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
@@ -108,7 +108,7 @@ public class OffloadIndexTest {
LedgerMetadata metadata = createLedgerMetadata();
log.debug("created metadata: {}", metadata.toString());
- blockBuilder.withMetadata(metadata);
+ blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1);
blockBuilder.addBlock(0, 2, 64 * 1024 * 1024);
blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024);
@@ -161,6 +161,7 @@ public class OffloadIndexTest {
ByteBuf wrapper = Unpooled.wrappedBuffer(b);
int magic = wrapper.readInt();
int indexBlockLength = wrapper.readInt();
+ long dataObjectLength = wrapper.readLong();
int segmentMetadataLength = wrapper.readInt();
int indexEntryCount = wrapper.readInt();
@@ -168,6 +169,7 @@ public class OffloadIndexTest {
assertEquals(magic, OffloadIndexBlockImpl.getIndexMagicWord());
assertEquals(indexBlockLength, readoutLen);
assertEquals(indexEntryCount, 3);
+ assertEquals(dataObjectLength, 1);
wrapper.readBytes(segmentMetadataLength);
log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}",
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.