You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/12 13:02:34 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #9096: PIP 76: Streaming Offload(Part I)

eolivelli commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r555100688



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +181,34 @@
                                     UUID uid,
                                     Map<String, String> extraMetadata);
 
+    /**
+     * Begin offload the passed in ledgers to longterm storage, it will finish
+     * when a segment reached it's size or time.
+     * Metadata passed in is for inspection purposes only and should be stored
+     * alongside the segment data.
+     *
+     * When the returned OffloaderHandle.getOffloadResultAsync completes, the corresponding
+     * ledgers has been persisted to the
+     * loadterm storage, so it is safe to delete the original copy in bookkeeper.
+     *
+     * The uid is used to identify an attempt to offload. The implementation should

Review comment:
       typo: uid

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +181,34 @@
                                     UUID uid,
                                     Map<String, String> extraMetadata);
 
+    /**
+     * Begin offload the passed in ledgers to longterm storage, it will finish
+     * when a segment reached it's size or time.
+     * Metadata passed in is for inspection purposes only and should be stored
+     * alongside the segment data.
+     *
+     * When the returned OffloaderHandle.getOffloadResultAsync completes, the corresponding
+     * ledgers has been persisted to the
+     * loadterm storage, so it is safe to delete the original copy in bookkeeper.
+     *
+     * The uid is used to identify an attempt to offload. The implementation should
+     * use this to deterministically generate a unique name for the offloaded object.
+     * This uid will be stored in the managed ledger metadata before attempting the
+     * call to offload(). If a subsequent or concurrent call to streamingOffload() finds
+     * a uid in the metadata, it will attempt to cleanup this attempt with a call
+     * to #deleteOffloaded(ReadHandle,UUID). Once the offload attempt completes,
+     * the managed ledger will update its metadata again, to record the completion,
+     * ensuring that subsequent calls will not attempt to offload the same ledger
+     * again.
+     *
+     * @return an OffloaderHandle, which when `completeFuture()` completed, denotes that the offload has been successful.
+     */
+    default CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uuid, long beginLedger,
+                                                              long beginEntry,

Review comment:
       what happens if I call this method for the same ManagedLedger more times, with overlapping intervals ?

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +260,183 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry, config.getDriver(), driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = new StreamingOffloadIndexBlockBuilderImpl();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+        BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+        DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+        Blob blob = blobBuilder.build();
+        streamingMpu = blobStore
+                .initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions());
+
+        scheduler.chooseThread(segmentInfo).execute(() -> {
+            log.info("start offloading segment: {}", segmentInfo);
+            streamingOffloadLoop(1, 0);
+        });
+        scheduler.schedule(this::closeSegment, segmentCloseTime.toMillis(), TimeUnit.MILLISECONDS);
+
+        return CompletableFuture.completedFuture(new OffloadHandle() {
+            @Override
+            public boolean canOffer(long size) {
+                return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+            }
+
+            @Override
+            public PositionImpl lastOffered() {
+                return BlobStoreManagedLedgerOffloader.this.lastOffered();
+            }
+
+            @Override
+            public boolean offerEntry(EntryImpl entry) throws OffloadSegmentClosedException,
+                    OffloadNotConsecutiveException {
+                return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+            }
+
+            @Override
+            public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+                return BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+            }
+        });
+    }
+
+    private void streamingOffloadLoop(int partId, int dataObjectLength) {
+        log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+        if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+            offloadResult.complete(segmentInfo.result());
+            return;
+        }
+        final BufferedOffloadStream payloadStream;
+
+        while (offloadBuffer.isEmpty()) {
+            if (segmentInfo.isClosed()) {
+                offloadResult.complete(segmentInfo.result());
+            } else {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        final Entry peek = offloadBuffer.peek();
+        //initialize payload when there is at least one entry
+        final long blockLedgerId = peek.getLedgerId();
+        final long blockEntryId = peek.getEntryId();
+        payloadStream = new BufferedOffloadStream(streamingBlockSize, offloadBuffer, segmentInfo,
+                blockLedgerId, blockEntryId, bufferLength);
+        try {
+            streamingIndexBuilder.addLedgerMeta(blockLedgerId, ml.getRawLedgerMetadata(blockLedgerId).get());
+            streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+        } catch (InterruptedException | ExecutionException e) {
+            offloadResult.completeExceptionally(e);
+            return;
+        }
+        log.debug("begin upload payload");
+        Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+        partPayload.getContentMetadata().setContentType("application/octet-stream");
+        streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId, partPayload));
+        streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId, streamingBlockSize);
+
+        log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}",
+                config.getBucket(), streamingDataBlockKey, partId, streamingMpu.id());
+        if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+            try {
+                blobStore.completeMultipartUpload(streamingMpu, streamingParts);
+                streamingIndexBuilder.withDataObjectLength(dataObjectLength + streamingBlockSize);
+                final StreamingOffloadIndexBlock index = streamingIndexBuilder.build();
+                final StreamingOffloadIndexBlock.IndexInputStream indexStream = index.toStream();
+                final BlobBuilder indexBlobBuilder = blobStore.blobBuilder(streamingDataIndexKey);
+                DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+                final InputStreamPayload indexPayLoad = Payloads.newInputStreamPayload(indexStream);
+                indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+                indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+                final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+                        .contentLength(indexStream.getStreamSize())
+                        .build();
+                blobStore.putBlob(config.getBucket(), indexBlob);
+
+                offloadResult.complete(segmentInfo.result());
+            } catch (Exception e) {
+                log.error("streaming offload failed", e);
+                offloadResult.completeExceptionally(e);
+            }
+        } else {
+            scheduler.chooseThread(segmentInfo)
+                    .execute(() -> streamingOffloadLoop(partId + 1, dataObjectLength + streamingBlockSize));
+        }
+    }
+
+    private CompletableFuture<OffloadResult> getOffloadResultAsync() {
+        return this.offloadResult;
+    }
+
+    private synchronized boolean offerEntry(EntryImpl entry) throws OffloadSegmentClosedException,
+            OffloadNotConsecutiveException {
+        if (segmentInfo.isClosed()) {
+            throw new OffloadSegmentClosedException("Segment already closed " + segmentInfo);
+        } else {
+            if (!naiveCheckConsecutive(lastOfferedPosition, entry.getPosition())) {
+                throw new OffloadNotConsecutiveException(
+                        Strings.lenientFormat("position %s and %s are not consecutive", lastOfferedPosition,
+                                entry.getPosition()));
+            }
+            entry.retain();
+            offloadBuffer.add(entry);
+            bufferLength.getAndAdd(entry.getLength());
+            segmentLength.getAndAdd(entry.getLength());
+            lastOfferedPosition = entry.getPosition();
+            if (segmentLength.get() >= maxSegmentLength) {
+                closeSegment();
+            }
+            return true;
+        }
+    }
+
+    private synchronized void closeSegment() {
+        log.debug("close segment {} {}", lastOfferedPosition.getLedgerId(), lastOfferedPosition.getEntryId());
+        this.segmentInfo.closeSegment(lastOfferedPosition.getLedgerId(), lastOfferedPosition.getEntryId());
+    }
+
+    private boolean naiveCheckConsecutive(PositionImpl lastOfferedPosition, PositionImpl offeringPosition) {

Review comment:
       nit: static ?

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +260,183 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry, config.getDriver(), driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = new StreamingOffloadIndexBlockBuilderImpl();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+        BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+        DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+        Blob blob = blobBuilder.build();
+        streamingMpu = blobStore
+                .initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions());
+
+        scheduler.chooseThread(segmentInfo).execute(() -> {
+            log.info("start offloading segment: {}", segmentInfo);
+            streamingOffloadLoop(1, 0);
+        });
+        scheduler.schedule(this::closeSegment, segmentCloseTime.toMillis(), TimeUnit.MILLISECONDS);

Review comment:
       should not we perform this operation only after the end of the loop above ?
   what happens if the streamingOffloadLoop does not complete in time ?

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +260,183 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry, config.getDriver(), driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = new StreamingOffloadIndexBlockBuilderImpl();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+        BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+        DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+        Blob blob = blobBuilder.build();
+        streamingMpu = blobStore
+                .initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions());
+
+        scheduler.chooseThread(segmentInfo).execute(() -> {
+            log.info("start offloading segment: {}", segmentInfo);
+            streamingOffloadLoop(1, 0);
+        });
+        scheduler.schedule(this::closeSegment, segmentCloseTime.toMillis(), TimeUnit.MILLISECONDS);
+
+        return CompletableFuture.completedFuture(new OffloadHandle() {
+            @Override
+            public boolean canOffer(long size) {
+                return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+            }
+
+            @Override
+            public PositionImpl lastOffered() {
+                return BlobStoreManagedLedgerOffloader.this.lastOffered();
+            }
+
+            @Override
+            public boolean offerEntry(EntryImpl entry) throws OffloadSegmentClosedException,
+                    OffloadNotConsecutiveException {
+                return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+            }
+
+            @Override
+            public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+                return BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+            }
+        });
+    }
+
+    private void streamingOffloadLoop(int partId, int dataObjectLength) {
+        log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+        if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+            offloadResult.complete(segmentInfo.result());
+            return;
+        }
+        final BufferedOffloadStream payloadStream;
+
+        while (offloadBuffer.isEmpty()) {
+            if (segmentInfo.isClosed()) {
+                offloadResult.complete(segmentInfo.result());
+            } else {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        final Entry peek = offloadBuffer.peek();
+        //initialize payload when there is at least one entry
+        final long blockLedgerId = peek.getLedgerId();
+        final long blockEntryId = peek.getEntryId();
+        payloadStream = new BufferedOffloadStream(streamingBlockSize, offloadBuffer, segmentInfo,
+                blockLedgerId, blockEntryId, bufferLength);
+        try {
+            streamingIndexBuilder.addLedgerMeta(blockLedgerId, ml.getRawLedgerMetadata(blockLedgerId).get());
+            streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+        } catch (InterruptedException | ExecutionException e) {
+            offloadResult.completeExceptionally(e);
+            return;
+        }
+        log.debug("begin upload payload");

Review comment:
       we can provide more information here

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -115,6 +217,16 @@
     CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
                                             Map<String, String> offloadDriverMetadata);
 
+    default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
+                                                        Map<String, String> offloadDriverMetadata) {
+        throw new UnsupportedClassVersionError();

Review comment:
       you should return a CompletableFuture that reports the UnsupportedOperationException
   the caller of a method that returns a Future does not expect the method to throw exceptions.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -115,6 +239,16 @@
     CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
                                             Map<String, String> offloadDriverMetadata);
 
+    default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
+                                                        Map<String, String> offloadDriverMetadata) {
+        throw new UnsupportedClassVersionError();
+    }
+
+    default CompletableFuture<Void> deleteOffloaded(UUID uid,
+                                                    Map<String, String> offloadDriverMetadata) {
+        throw new UnsupportedOperationException();

Review comment:
       the same here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org