You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/20 05:26:38 UTC

[GitHub] [pulsar] zymap commented on a change in pull request #9096: PIP 76: Streaming Offload(Part I)

zymap commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r560659329



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1685,6 +1685,14 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
         return getLedgerHandle(ledgerId).thenApply(rh -> rh.getLedgerMetadata().toSafeString());
     }
 
+    @Override
+    public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
+        CompletableFuture<LedgerInfo> result = new CompletableFuture<>();
+        final LedgerInfo ledgerInfo = ledgers.get(ledgerId);

Review comment:
       Do we need to check the returned value? It's might be null.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +150,34 @@
                                     UUID uid,
                                     Map<String, String> extraMetadata);
 
+    /**
+     * Begin offload the passed in ledgers to longterm storage, it will finish
+     * when a segment reached it's size or time.
+     * Metadata passed in is for inspection purposes only and should be stored
+     * alongside the segment data.
+     *
+     * When the returned OffloaderHandle.getOffloadResultAsync completes, the corresponding
+     * ledgers has been persisted to the
+     * longterm storage, so it is safe to delete the original copy in bookkeeper.
+     *
+     * The uid is used to identify an attempt to offload. The implementation should
+     * use this to deterministically generate a unique name for the offloaded object.
+     * This uid will be stored in the managed ledger metadata before attempting the
+     * call to offload(). If a subsequent or concurrent call to streamingOffload() finds

Review comment:
       streamingoffload()?

##########
File path: managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
##########
@@ -97,21 +97,21 @@ public void testOffloadRead() throws Exception {
             assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(1))
-            .readOffloaded(anyLong(), any(), anyMap());
+                .readOffloaded(anyLong(), (UUID) any(), anyMap());

Review comment:
       any() should be any type, why we need to cast it to the UUID?

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +266,213 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger, beginEntry, config.getDriver(),
+                driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);

Review comment:
       If we have another call for the `streamingOffloading`, do those variables will impact the previous offload process? Because the following offload loop still uses the same variables.

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +266,213 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger, beginEntry, config.getDriver(),
+                driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+        BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+        DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+        Blob blob = blobBuilder.build();
+        streamingMpu = blobStore

Review comment:
       same above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org