You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/21 07:54:30 UTC

[GitHub] sijie closed pull request #1810: Store offloaded data object size in index

sijie closed pull request #1810: Store offloaded data object size in index
URL: https://github.com/apache/incubator-pulsar/pull/1810
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
index 8f9d3cecb1..944edcaf09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -36,7 +36,7 @@
      * Get the content of the index block as InputStream.
      * Read out in format:
      *   | index_magic_header | index_block_len | index_entry_count |
-     *   |segment_metadata_length | segment metadata | index entries |
+     *   | data_object_size | segment_metadata_length | segment metadata | index entries ... |
      */
     InputStream toStream() throws IOException;
 
@@ -59,5 +59,9 @@
      */
     LedgerMetadata getLedgerMetadata();
 
+    /**
+     * Get the total size of the data object.
+     */
+    long getDataObjectLength();
 }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
index 8ec0395498..c60ce88967 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
@@ -37,7 +37,7 @@
      *
      * @param metadata the ledger metadata
      */
-    OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata);
+    OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata);
 
     /**
      * Add one payload block related information into index block.
@@ -51,6 +51,12 @@
      */
     OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize);
 
+    /**
+     * Specify the length of data object this index is associated with.
+     * @param dataObjectLength the length of the data object
+     */
+    OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength);
+
     /**
      * Finalize the immutable OffloadIndexBlock
      */
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 7a73a3bbc8..276488de36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -110,7 +110,7 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.submit(() -> {
             OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
-                .withMetadata(readHandle.getLedgerMetadata());
+                .withLedgerMetadata(readHandle.getLedgerMetadata());
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
             String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid);
             InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey);
@@ -124,6 +124,7 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
                 return;
             }
 
+            long dataObjectLength = 0;
             // start multi part upload for data block.
             try {
                 long startEntry = 0;
@@ -157,6 +158,8 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
                         entryBytesWritten += blockStream.getBlockEntryBytesCount();
                         partId++;
                     }
+
+                    dataObjectLength += blockSize;
                 }
 
                 s3client.completeMultipartUpload(new CompleteMultipartUploadRequest()
@@ -171,7 +174,7 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
             }
 
             // upload index block
-            try (OffloadIndexBlock index = indexBuilder.build();
+            try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build();
                  InputStream indexStream = index.toStream()) {
                 // write the index block
                 ObjectMetadata metadata = new ObjectMetadata();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
index 766083dcb3..3b0f8995e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
@@ -34,6 +34,7 @@
 public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
 
     private LedgerMetadata ledgerMetadata;
+    private long dataObjectLength;
     private List<OffloadIndexEntryImpl> entries;
     private int lastBlockSize;
 
@@ -42,7 +43,13 @@ public OffloadIndexBlockBuilderImpl() {
     }
 
     @Override
-    public OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata) {
+    public OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength) {
+        this.dataObjectLength = dataObjectLength;
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata) {
         this.ledgerMetadata = metadata;
         return this;
     }
@@ -73,7 +80,8 @@ public OffloadIndexBlock fromStream(InputStream is) throws IOException {
     public OffloadIndexBlock build() {
         checkState(ledgerMetadata != null);
         checkState(!entries.isEmpty());
-        return OffloadIndexBlockImpl.get(ledgerMetadata, entries);
+        checkState(dataObjectLength > 0);
+        return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, entries);
     }
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
index f43c50367e..638edc4451 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
@@ -54,6 +54,7 @@
     private static final int INDEX_MAGIC_WORD = 0xDE47DE47;
 
     private LedgerMetadata segmentMetadata;
+    private long dataObjectLength;
     private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
 
     private final Handle<OffloadIndexBlockImpl> recyclerHandle;
@@ -69,12 +70,14 @@ private OffloadIndexBlockImpl(Handle<OffloadIndexBlockImpl> recyclerHandle) {
         this.recyclerHandle = recyclerHandle;
     }
 
-    public static OffloadIndexBlockImpl get(LedgerMetadata metadata, List<OffloadIndexEntryImpl> entries) {
+    public static OffloadIndexBlockImpl get(LedgerMetadata metadata, long dataObjectLength,
+                                            List<OffloadIndexEntryImpl> entries) {
         OffloadIndexBlockImpl block = RECYCLER.get();
         block.indexEntries = Maps.newTreeMap();
         entries.forEach(entry -> block.indexEntries.putIfAbsent(entry.getEntryId(), entry));
         checkState(entries.size() == block.indexEntries.size());
         block.segmentMetadata = metadata;
+        block.dataObjectLength = dataObjectLength;
         return block;
     }
 
@@ -86,6 +89,7 @@ public static OffloadIndexBlockImpl get(InputStream stream) throws IOException {
     }
 
     public void recycle() {
+        dataObjectLength = -1;
         segmentMetadata = null;
         indexEntries.clear();
         indexEntries = null;
@@ -116,6 +120,11 @@ public LedgerMetadata getLedgerMetadata() {
         return this.segmentMetadata;
     }
 
+    @Override
+    public long getDataObjectLength() {
+        return this.dataObjectLength;
+    }
+
     private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
         LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
         builder.setQuorumSize(metadata.getWriteQuorumSize())
@@ -159,6 +168,7 @@ public InputStream toStream() throws IOException {
 
         indexBlockLength = 4 /* magic header */
             + 4 /* index block length */
+            + 8 /* data object length */
             + 4 /* segment metadata length */
             + 4 /* index entry count */
             + segmentMetadataLength
@@ -168,6 +178,7 @@ public InputStream toStream() throws IOException {
 
         out.writeInt(INDEX_MAGIC_WORD)
             .writeInt(indexBlockLength)
+            .writeLong(dataObjectLength)
             .writeInt(segmentMetadataLength)
             .writeInt(indexEntryCount);
 
@@ -306,6 +317,7 @@ private OffloadIndexBlock fromStream(InputStream stream) throws IOException {
                                                 magic, INDEX_MAGIC_WORD));
         }
         int indexBlockLength = dis.readInt();
+        this.dataObjectLength = dis.readLong();
         int segmentMetadataLength = dis.readInt();
         int indexEntryCount = dis.readInt();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
index 037ea6726c..984af597a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
@@ -197,9 +197,8 @@ public static ReadHandle open(ScheduledExecutorService executor,
             OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
             OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent());
 
-            ObjectMetadata dataMetadata = s3client.getObjectMetadata(bucket, key); // FIXME: this should be part of index
             S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key,
-                                                                          dataMetadata.getContentLength(),
+                                                                          index.getDataObjectLength(),
                                                                           readBufferSize);
             return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
index 916c4cda57..af9d6e4084 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
@@ -108,7 +108,7 @@ public void offloadIndexBlockImplTest() throws Exception {
         LedgerMetadata metadata = createLedgerMetadata();
         log.debug("created metadata: {}", metadata.toString());
 
-        blockBuilder.withMetadata(metadata);
+        blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1);
 
         blockBuilder.addBlock(0, 2, 64 * 1024 * 1024);
         blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024);
@@ -161,6 +161,7 @@ public void offloadIndexBlockImplTest() throws Exception {
         ByteBuf wrapper = Unpooled.wrappedBuffer(b);
         int magic = wrapper.readInt();
         int indexBlockLength = wrapper.readInt();
+        long dataObjectLength = wrapper.readLong();
         int segmentMetadataLength = wrapper.readInt();
         int indexEntryCount = wrapper.readInt();
 
@@ -168,6 +169,7 @@ public void offloadIndexBlockImplTest() throws Exception {
         assertEquals(magic, OffloadIndexBlockImpl.getIndexMagicWord());
         assertEquals(indexBlockLength, readoutLen);
         assertEquals(indexEntryCount, 3);
+        assertEquals(dataObjectLength, 1);
 
         wrapper.readBytes(segmentMetadataLength);
         log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}",


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services