You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 02:29:58 UTC
[pulsar] branch master updated: PIP 76: Streaming Offload(Part I)
(#9096)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 17f399e PIP 76: Streaming Offload(Part I) (#9096)
17f399e is described below
commit 17f399ebebaaf03b969deea971f3320595398c54
Author: Renkai Ge <ga...@gmail.com>
AuthorDate: Tue Jan 26 10:29:17 2021 +0800
PIP 76: Streaming Offload(Part I) (#9096)
This PR contains the new interface and implementation of the offloader in the below PIP
Unit test is still in progress
- [x] StreamingDataBlockHeaderImpl
- [x] StreamingBlobStoreBackedReadHandleImpl
- [x] BufferedOffloadStream
- [x] BlobStoreManagedLedgerOffloader
- [x] StreamingOffloadIndexBlock
PIP 76: https://github.com/apache/pulsar/wiki/PIP-76:-Streaming-Offload
---
.../apache/bookkeeper/mledger/LedgerOffloader.java | 97 ++++-
.../apache/bookkeeper/mledger/ManagedLedger.java | 9 +-
.../org/apache/bookkeeper/mledger/Position.java | 4 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +-
.../mledger/impl/OffloadSegmentInfoImpl.java | 61 +++
.../bookkeeper/mledger/impl/PositionImpl.java | 1 -
managed-ledger/src/main/proto/MLDataFormats.proto | 11 +
.../mledger/impl/OffloadPrefixReadTest.java | 10 +-
.../mledger/offload/jcloud/DataBlockHeader.java | 2 +-
.../mledger/offload/jcloud/OffloadIndexBlock.java | 15 +-
.../offload/jcloud/OffloadIndexBlockBuilder.java | 6 +-
...oadIndexBlock.java => OffloadIndexBlockV2.java} | 32 +-
...uilder.java => OffloadIndexBlockV2Builder.java} | 23 +-
.../mledger/offload/jcloud/OffloadIndexEntry.java | 6 +-
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 15 +-
.../impl/BlobStoreBackedReadHandleImplV2.java | 302 ++++++++++++++
.../impl/BlobStoreManagedLedgerOffloader.java | 341 +++++++++++++++-
.../impl/BlockAwareSegmentInputStreamImpl.java | 3 +-
.../offload/jcloud/impl/BufferedOffloadStream.java | 135 +++++++
.../offload/jcloud/impl/DataBlockHeaderImpl.java | 4 +-
.../offload/jcloud/impl/DataBlockUtils.java | 6 +-
.../jcloud/impl/OffloadIndexBlockBuilderImpl.java | 97 -----
.../offload/jcloud/impl/OffloadIndexBlockImpl.java | 22 +-
.../impl/OffloadIndexBlockV2BuilderImpl.java | 150 +++++++
.../jcloud/impl/OffloadIndexBlockV2Impl.java | 379 ++++++++++++++++++
.../offload/jcloud/impl/OffloadIndexEntryImpl.java | 3 +
...Impl.java => StreamingDataBlockHeaderImpl.java} | 82 ++--
.../provider/TieredStorageConfiguration.java | 50 +++
.../impl/BlobStoreManagedLedgerOffloaderBase.java | 42 +-
...obStoreManagedLedgerOffloaderStreamingTest.java | 445 +++++++++++++++++++++
.../jcloud/impl/BufferedOffloadStreamTest.java | 196 +++++++++
.../offload/jcloud/impl/DataBlockHeaderV2Test.java | 87 ++++
.../offload/jcloud/impl/MockManagedLedger.java | 328 +++++++++++++++
.../offload/jcloud/impl/OffloadIndexTest.java | 25 +-
.../offload/jcloud/impl/OffloadIndexV2Test.java | 327 +++++++++++++++
.../jcloud/src/test/resources/log4j2-test.yml | 61 +++
36 files changed, 3121 insertions(+), 266 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 4552199..32fbc9c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -22,19 +22,71 @@ import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ @ToString
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per segment.
+ */
+ interface OffloadHandle {
+ enum OfferEntryResult {
+ SUCCESS,
+ FAIL_BUFFER_FULL,
+ FAIL_SEGMENT_CLOSED,
+ FAIL_NOT_CONSECUTIVE
+ }
+
+ Position lastOffered();
+
+ CompletableFuture<Position> lastOfferedAsync();
+
+ /**
+ * The caller should manually release entry no matter what the offer result is.
+ */
+ OfferEntryResult offerEntry(Entry entry);
+
+ CompletableFuture<OfferEntryResult> offerEntryAsync(Entry entry);
+
+ CompletableFuture<OffloadResult> getOffloadResultAsync();
+
+ /**
+ * Manually close current offloading segment
+ * @return true if the segment is not already closed
+ */
+ boolean close();
+
+ default CompletableFuture<Boolean> AsyncClose() {
+ return CompletableFuture.completedFuture(close());
+ }
+ }
+
// TODO: improve the user metadata in subsequent changes
String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
@@ -63,7 +115,7 @@ public interface LedgerOffloader {
* alongside the ledger data.
*
* When the returned future completes, the ledger has been persisted to the
- * loadterm storage, so it is safe to delete the original copy in bookkeeper.
+ * longterm storage, so it is safe to delete the original copy in bookkeeper.
*
* The uid is used to identify an attempt to offload. The implementation should
* use this to deterministically generate a unique name for the offloaded object.
@@ -86,6 +138,35 @@ public interface LedgerOffloader {
Map<String, String> extraMetadata);
/**
+ * Begin offload the passed in ledgers to longterm storage, it will finish
+ * when a segment reached it's size or time.
+ * Should only be called once for a LedgerOffloader instance.
+ * Metadata passed in is for inspection purposes only and should be stored
+ * alongside the segment data.
+ *
+ * When the returned OffloaderHandle.getOffloadResultAsync completes, the corresponding
+ * ledgers has been persisted to the
+ * longterm storage, so it is safe to delete the original copy in bookkeeper.
+ *
+ * The uid is used to identify an attempt to offload. The implementation should
+ * use this to deterministically generate a unique name for the offloaded object.
+ * This uid will be stored in the managed ledger metadata before attempting the
+ * call to streamingOffload(). If a subsequent or concurrent call to streamingOffload() finds
+ * a uid in the metadata, it will attempt to cleanup this attempt with a call
+ * to #deleteOffloaded(ReadHandle,UUID). Once the offload attempt completes,
+ * the managed ledger will update its metadata again, to record the completion,
+ * ensuring that subsequent calls will not attempt to offload the same ledger
+ * again.
+ *
+ * @return an OffloaderHandle, which when `completeFuture()` completed, denotes that the offload has been successful.
+ */
+ default CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uid, long beginLedger,
+ long beginEntry,
+ Map<String, String> driverMetadata) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Create a ReadHandle which can be used to read a ledger back from longterm
* storage.
*
@@ -115,6 +196,16 @@ public interface LedgerOffloader {
CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata);
+ default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
+ Map<String, String> offloadDriverMetadata) {
+ throw new UnsupportedOperationException();
+ }
+
+ default CompletableFuture<Void> deleteOffloaded(UUID uid,
+ Map<String, String> offloadDriverMetadata) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Get offload policies of this LedgerOffloader
*
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 4f4c226..a2e0ba0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -19,10 +19,8 @@
package org.apache.bookkeeper.mledger;
import io.netty.buffer.ByteBuf;
-
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -33,6 +31,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
/**
@@ -595,4 +594,10 @@ public interface ManagedLedger {
* Get the ManagedLedgerInterceptor for ManagedLedger.
* */
ManagedLedgerInterceptor getManagedLedgerInterceptor();
+
+ /**
+ * Get basic ledger summary.
+ * will got null if corresponding ledger not exists.
+ */
+ CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java
index 1f34c8c..c76e7b7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java
@@ -34,4 +34,8 @@ public interface Position {
* @return the position of the next logical entry
*/
Position getNext();
+
+ long getLedgerId();
+
+ long getEntryId();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7241ee5..1c4b08e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,6 +34,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
@@ -64,7 +65,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import io.netty.util.ReferenceCountUtil;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -1683,6 +1683,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return getLedgerHandle(ledgerId).thenApply(rh -> rh.getLedgerMetadata().toSafeString());
}
+ @Override
+ public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
+ CompletableFuture<LedgerInfo> result = new CompletableFuture<>();
+ final LedgerInfo ledgerInfo = ledgers.get(ledgerId);
+ result.complete(ledgerInfo);
+ return result;
+ }
+
CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
if (ledgerHandle != null) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
new file mode 100644
index 0000000..cca1676
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import java.util.Map;
+import java.util.UUID;
+import lombok.ToString;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+
+@ToString
+public class OffloadSegmentInfoImpl {
+ public OffloadSegmentInfoImpl(UUID uuid, long beginLedgerId, long beginEntryId, String driverName,
+ Map<String, String> driverMetadata) {
+ this.uuid = uuid;
+ this.beginLedgerId = beginLedgerId;
+ this.beginEntryId = beginEntryId;
+ this.driverName = driverName;
+ this.driverMetadata = driverMetadata;
+ }
+
+
+ public final UUID uuid;
+ public final long beginLedgerId;
+ public final long beginEntryId;
+ public final String driverName;
+ volatile private long endLedgerId;
+ volatile private long endEntryId;
+ volatile boolean closed = false;
+ public final Map<String, String> driverMetadata;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void closeSegment(long endLedger, long endEntry) {
+ this.endLedgerId = endLedger;
+ this.endEntryId = endEntry;
+ this.closed = true;
+ }
+
+ public LedgerOffloader.OffloadResult result() {
+ return new LedgerOffloader.OffloadResult(beginLedgerId, beginEntryId, endLedgerId, endEntryId);
+ }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index ef445f9..50382f9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.base.Objects;
import com.google.common.collect.ComparisonChain;
import org.apache.bookkeeper.mledger.Position;
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index db3b4ff..a5be8e4 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -38,6 +38,17 @@ message OffloadContext {
optional bool bookkeeperDeleted = 4;
optional int64 timestamp = 5;
optional OffloadDriverMetadata driverMetadata = 6;
+ repeated OffloadSegment offloadSegment = 7;
+}
+
+message OffloadSegment {
+ optional int64 uidMsb = 1;
+ optional int64 uidLsb = 2;
+ optional bool complete = 3;
+ optional int64 assignedTimestamp = 4; //timestamp in millisecond
+ optional int64 offloadedTimestamp = 5; //timestamp in millisecond
+ optional int64 endEntryId = 6;
+ optional OffloadDriverMetadata driverMetadata = 7;
}
message ManagedLedgerInfo {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 69011cac..a3d2283 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -97,21 +97,21 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(1))
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), (UUID) any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap());
for (Entry e : cursor.readEntries(10)) {
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), (UUID) any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
for (Entry e : cursor.readEntries(5)) {
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), (UUID) any(), anyMap());
}
@Test
@@ -164,7 +164,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
}
// For offloaded first and not deleted ledgers, they should be read from bookkeeper.
verify(offloader, never())
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), (UUID) any(), anyMap());
// Delete offladed message from bookkeeper
assertEventuallyTrue(() -> bkc.getLedgers().contains(firstLedger.getLedgerId()));
@@ -186,7 +186,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
// Ledgers deleted from bookkeeper, now should read from offloader
verify(offloader, atLeastOnce())
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), (UUID) any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/DataBlockHeader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/DataBlockHeader.java
index 6a7c1eb..0a3565c 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/DataBlockHeader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/DataBlockHeader.java
@@ -22,7 +22,7 @@ import java.io.InputStream;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
/**
- * The data block header in code storage for each data block.
+ * The data block header in tiered storage for each data block.
*
* <p>Currently, It is in format:
* [ magic_word -- int ][ block_len -- int ][ first_entry_id -- long][padding]
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
index 9899740..01b610a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
@@ -29,7 +29,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
* The Index block abstraction used for offload a ledger to long term storage.
*/
@Unstable
-public interface OffloadIndexBlock extends Closeable {
+public interface OffloadIndexBlock extends Closeable, OffloadIndexBlockV2 {
/**
* Get the content of the index block as InputStream.
@@ -86,5 +86,18 @@ public interface OffloadIndexBlock extends Closeable {
return streamSize;
}
}
+
+ default OffloadIndexEntry getIndexEntryForEntry(long ledgerId, long messageEntryId) throws IOException {
+ return getIndexEntryForEntry(messageEntryId);
+ }
+
+ default long getStartEntryId(long ledgerId) {
+ return 0; //Offload index block v1 always start with 0;
+ }
+
+ default LedgerMetadata getLedgerMetadata(long ledgerId) {
+ return getLedgerMetadata();
+ }
+
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
index 7c398e3..41a7009 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
@@ -23,7 +23,7 @@ import java.io.InputStream;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
-import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockBuilderImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockV2BuilderImpl;
/**
* Interface for builder of index block used for offload a ledger to long term storage.
@@ -71,12 +71,12 @@ public interface OffloadIndexBlockBuilder {
/**
* Construct OffloadIndex from an InputStream.
*/
- OffloadIndexBlock fromStream(InputStream is) throws IOException;
+ OffloadIndexBlockV2 fromStream(InputStream is) throws IOException;
/**
* create an OffloadIndexBlockBuilder.
*/
static OffloadIndexBlockBuilder create() {
- return new OffloadIndexBlockBuilderImpl();
+ return new OffloadIndexBlockV2BuilderImpl();
}
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2.java
similarity index 72%
copy from tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
copy to tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2.java
index 9899740..793d974 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2.java
@@ -19,9 +19,7 @@
package org.apache.bookkeeper.mledger.offload.jcloud;
import java.io.Closeable;
-import java.io.FilterInputStream;
import java.io.IOException;
-import java.io.InputStream;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
@@ -29,7 +27,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
* The Index block abstraction used for offload a ledger to long term storage.
*/
@Unstable
-public interface OffloadIndexBlock extends Closeable {
+public interface OffloadIndexBlockV2 extends Closeable {
/**
* Get the content of the index block as InputStream.
@@ -37,7 +35,7 @@ public interface OffloadIndexBlock extends Closeable {
* | index_magic_header | index_block_len | index_entry_count |
* | data_object_size | segment_metadata_length | segment metadata | index entries ... |
*/
- IndexInputStream toStream() throws IOException;
+ OffloadIndexBlock.IndexInputStream toStream() throws IOException;
/**
* Get the related OffloadIndexEntry that contains the given messageEntryId.
@@ -46,7 +44,9 @@ public interface OffloadIndexBlock extends Closeable {
* the entry id of message
* @return the offload index entry
*/
- OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws IOException;
+ OffloadIndexEntry getIndexEntryForEntry(long ledgerId, long messageEntryId) throws IOException;
+
+ public long getStartEntryId(long ledgerId);
/**
* Get the entry count that contained in this index Block.
@@ -55,8 +55,9 @@ public interface OffloadIndexBlock extends Closeable {
/**
* Get LedgerMetadata.
+ * @return
*/
- LedgerMetadata getLedgerMetadata();
+ LedgerMetadata getLedgerMetadata(long ledgerId);
/**
* Get the total size of the data object.
@@ -67,24 +68,5 @@ public interface OffloadIndexBlock extends Closeable {
* Get the length of the header in the blocks in the data object.
*/
long getDataBlockHeaderLength();
-
- /**
- * An input stream which knows the size of the stream upfront.
- */
- class IndexInputStream extends FilterInputStream {
- final long streamSize;
-
- public IndexInputStream(InputStream in, long streamSize) {
- super(in);
- this.streamSize = streamSize;
- }
-
- /**
- * @return the number of bytes in the stream.
- */
- public long getStreamSize() {
- return streamSize;
- }
- }
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2Builder.java
similarity index 76%
copy from tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
copy to tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2Builder.java
index 7c398e3..a79bc9f 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2Builder.java
@@ -20,24 +20,25 @@ package org.apache.bookkeeper.mledger.offload.jcloud;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
-import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockBuilderImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockV2BuilderImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
/**
* Interface for builder of index block used for offload a ledger to long term storage.
*/
@Unstable
@LimitedPrivate
-public interface OffloadIndexBlockBuilder {
+public interface OffloadIndexBlockV2Builder {
/**
* Build index block with the passed in ledger metadata.
*
+ * @param ledgerId
* @param metadata the ledger metadata
*/
- OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata);
+ OffloadIndexBlockV2Builder addLedgerMeta(Long ledgerId, LedgerInfo metadata);
/**
* Add one payload block related information into index block.
@@ -49,34 +50,34 @@ public interface OffloadIndexBlockBuilder {
* @param partId the payload block Id
* @param blockSize the payload block size
*/
- OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize);
+ OffloadIndexBlockV2Builder addBlock(long ledgerId, long firstEntryId, int partId, int blockSize);
/**
* Specify the length of data object this index is associated with.
* @param dataObjectLength the length of the data object
*/
- OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength);
+ OffloadIndexBlockV2Builder withDataObjectLength(long dataObjectLength);
/**
* Specify the length of the block headers in the data object.
* @param dataHeaderLength the length of the headers
*/
- OffloadIndexBlockBuilder withDataBlockHeaderLength(long dataHeaderLength);
+ OffloadIndexBlockV2Builder withDataBlockHeaderLength(long dataHeaderLength);
/**
* Finalize the immutable OffloadIndexBlock.
*/
- OffloadIndexBlock build();
+ OffloadIndexBlockV2 buildV2();
/**
* Construct OffloadIndex from an InputStream.
*/
- OffloadIndexBlock fromStream(InputStream is) throws IOException;
+ OffloadIndexBlockV2 fromStream(InputStream is) throws IOException;
/**
* create an OffloadIndexBlockBuilder.
*/
- static OffloadIndexBlockBuilder create() {
- return new OffloadIndexBlockBuilderImpl();
+ static OffloadIndexBlockV2Builder create() {
+ return new OffloadIndexBlockV2BuilderImpl();
}
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexEntry.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexEntry.java
index e51e928..99d2206 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexEntry.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexEntry.java
@@ -23,8 +23,8 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
/**
* The Index Entry in OffloadIndexBlock.
- * It consists of the message entry id, the code storage block part id for this message entry,
- * and the offset in code storage block for this message id.
+ * It consists of the message entry id, the tiered storage block part id for this message entry,
+ * and the offset in tiered storage block for this message id.
*/
@Unstable
@LimitedPrivate
@@ -36,7 +36,7 @@ public interface OffloadIndexEntry {
long getEntryId();
/**
- * Get the block part id of code storage.
+ * Get the block part id of tiered storage.
*/
int getPartId();
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 16368fa..e3fd181 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -21,12 +21,12 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
-import lombok.val;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -134,8 +134,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
} else {
- val skipped = inputStream.skip(length);
- log.info("Skipped {} bytes.", skipped);
+ long ignored = inputStream.skip(length);
}
}
@@ -195,12 +194,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
Blob blob = blobStore.getBlob(bucket, indexKey);
versionCheck.check(indexKey, blob);
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
- OffloadIndexBlock index = indexBuilder.fromStream(blob.getPayload().openStream());
+ final InputStream payLoadStream = blob.getPayload().openStream();
+ OffloadIndexBlock index = (OffloadIndexBlock) indexBuilder.fromStream(payLoadStream);
+ payLoadStream.close();
BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
- versionCheck,
- index.getDataObjectLength(),
- readBufferSize);
+ versionCheck,
+ index.getDataObjectLength(),
+ readBufferSize);
return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor);
}
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
new file mode 100644
index 0000000..d8749f3
--- /dev/null
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.val;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
+ private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImplV2.class);
+
+ private final long ledgerId;
+ private final List<OffloadIndexBlockV2> indices;
+ private final List<BackedInputStream> inputStreams;
+ private final List<DataInputStream> dataStreams;
+ private final ExecutorService executor;
+
+ static class GroupedReader {
+ @Override
+ public String toString() {
+ return "GroupedReader{" +
+ "ledgerId=" + ledgerId +
+ ", firstEntry=" + firstEntry +
+ ", lastEntry=" + lastEntry +
+ '}';
+ }
+
+ public final long ledgerId;
+ public final long firstEntry;
+ public final long lastEntry;
+ OffloadIndexBlockV2 index;
+ BackedInputStream inputStream;
+ DataInputStream dataStream;
+
+ public GroupedReader(long ledgerId, long firstEntry, long lastEntry,
+ OffloadIndexBlockV2 index,
+ BackedInputStream inputStream, DataInputStream dataStream) {
+ this.ledgerId = ledgerId;
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = dataStream;
+ }
+ }
+
+ private BlobStoreBackedReadHandleImplV2(long ledgerId, List<OffloadIndexBlockV2> indices,
+ List<BackedInputStream> inputStreams,
+ ExecutorService executor) {
+ this.ledgerId = ledgerId;
+ this.indices = indices;
+ this.inputStreams = inputStreams;
+ this.dataStreams = new LinkedList<>();
+ for (BackedInputStream inputStream : inputStreams) {
+ dataStreams.add(new DataInputStream(inputStream));
+ }
+ this.executor = executor;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ //get the most complete one
+ return indices.get(indices.size() - 1).getLedgerMetadata(ledgerId);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ for (OffloadIndexBlockV2 indexBlock : indices) {
+ indexBlock.close();
+ }
+ for (DataInputStream dataStream : dataStreams) {
+ dataStream.close();
+ }
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+ CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+ if (firstEntry > lastEntry
+ || firstEntry < 0
+ || lastEntry > getLastAddConfirmed()) {
+ promise.completeExceptionally(new IllegalArgumentException());
+ return promise;
+ }
+ executor.submit(() -> {
+ List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+ List<GroupedReader> groupedReaders = null;
+ try {
+ groupedReaders = getGroupedReader(firstEntry, lastEntry);
+ } catch (Exception e) {
+ promise.completeExceptionally(e);
+ return;
+ }
+
+ for (GroupedReader groupedReader : groupedReaders) {
+ long entriesToRead = (groupedReader.lastEntry - groupedReader.firstEntry) + 1;
+ long nextExpectedId = groupedReader.firstEntry;
+ try {
+ while (entriesToRead > 0) {
+ int length = groupedReader.dataStream.readInt();
+ if (length < 0) { // hit padding or new block
+ groupedReader.inputStream
+ .seek(groupedReader.index
+ .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .getDataOffset());
+ continue;
+ }
+ long entryId = groupedReader.dataStream.readLong();
+
+ if (entryId == nextExpectedId) {
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+ entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+ int toWrite = length;
+ while (toWrite > 0) {
+ toWrite -= buf.writeBytes(groupedReader.dataStream, toWrite);
+ }
+ entriesToRead--;
+ nextExpectedId++;
+ } else if (entryId > nextExpectedId) {
+ groupedReader.inputStream
+ .seek(groupedReader.index
+ .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .getDataOffset());
+ continue;
+ } else if (entryId < nextExpectedId
+ && !groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .equals(
+ groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, entryId))) {
+ groupedReader.inputStream
+ .seek(groupedReader.index
+ .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .getDataOffset());
+ continue;
+ } else if (entryId > groupedReader.lastEntry) {
+ log.info("Expected to read {}, but read {}, which is greater than last entry {}",
+ nextExpectedId, entryId, groupedReader.lastEntry);
+ throw new BKException.BKUnexpectedConditionException();
+ } else {
+ val skipped = groupedReader.inputStream.skip(length);
+ }
+ }
+ } catch (Throwable t) {
+ promise.completeExceptionally(t);
+ entries.forEach(LedgerEntry::close);
+ }
+
+ promise.complete(LedgerEntriesImpl.create(entries));
+ }
+ });
+ return promise;
+ }
+
+ private List<GroupedReader> getGroupedReader(long firstEntry, long lastEntry) throws Exception {
+ List<GroupedReader> groupedReaders = new LinkedList<>();
+ for (int i = indices.size() - 1; i >= 0 && firstEntry <= lastEntry; i--) {
+ final OffloadIndexBlockV2 index = indices.get(i);
+ final long startEntryId = index.getStartEntryId(ledgerId);
+ if (startEntryId > lastEntry) {
+ log.debug("entries are in earlier indices, skip this segment ledger id: {}, begin entry id: {}",
+ ledgerId, startEntryId);
+ } else {
+ groupedReaders.add(new GroupedReader(ledgerId, startEntryId, lastEntry, index, inputStreams.get(i),
+ dataStreams.get(i)));
+ lastEntry = startEntryId - 1;
+ }
+ }
+
+ Preconditions.checkArgument(firstEntry > lastEntry);
+ for (int i = 0; i < groupedReaders.size() - 1; i++) {
+ final GroupedReader readerI = groupedReaders.get(i);
+ final GroupedReader readerII = groupedReaders.get(i + 1);
+ Preconditions.checkArgument(readerI.ledgerId == readerII.ledgerId);
+ Preconditions.checkArgument(readerI.firstEntry >= readerII.lastEntry);
+ }
+ return groupedReaders;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+ return readAsync(firstEntry, lastEntry);
+ }
+
+ @Override
+ public CompletableFuture<Long> readLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public long getLastAddConfirmed() {
+ return getLedgerMetadata().getLastEntryId();
+ }
+
+ @Override
+ public long getLength() {
+ return getLedgerMetadata().getLength();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return getLedgerMetadata().isClosed();
+ }
+
+ @Override
+ public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+ long timeOutInMillis,
+ boolean parallel) {
+ CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
+ promise.completeExceptionally(new UnsupportedOperationException());
+ return promise;
+ }
+
+ public static ReadHandle open(ScheduledExecutorService executor,
+ BlobStore blobStore, String bucket, List<String> keys, List<String> indexKeys,
+ VersionCheck versionCheck,
+ long ledgerId, int readBufferSize)
+ throws IOException {
+ List<BackedInputStream> inputStreams = new LinkedList<>();
+ List<OffloadIndexBlockV2> indice = new LinkedList<>();
+ for (int i = 0; i < indexKeys.size(); i++) {
+ String indexKey = indexKeys.get(i);
+ String key = keys.get(i);
+ log.debug("open bucket: {} index key: {}", bucket, indexKey);
+ Blob blob = blobStore.getBlob(bucket, indexKey);
+ log.debug("indexKey blob: {} {}", indexKey, blob);
+ versionCheck.check(indexKey, blob);
+ OffloadIndexBlockV2Builder indexBuilder = OffloadIndexBlockV2Builder.create();
+ final InputStream payloadStream = blob.getPayload().openStream();
+ OffloadIndexBlockV2 index = indexBuilder.fromStream(payloadStream);
+ payloadStream.close();
+
+ BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
+ versionCheck,
+ index.getDataObjectLength(),
+ readBufferSize);
+ inputStreams.add(inputStream);
+ indice.add(index);
+ }
+ return new BlobStoreBackedReadHandleImplV2(ledgerId, indice, inputStreams, executor);
+ }
+}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 8607e00..c76864d 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -22,22 +22,39 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.time.Duration;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle.OfferEntryResult;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.OffloadSegmentInfoImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock.IndexInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.BlobStoreLocation;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
@@ -50,6 +67,7 @@ import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
+import org.jclouds.io.payloads.InputStreamPayload;
/**
* Tiered Storage Offloader that is backed by a JCloud Blob Store.
@@ -70,10 +88,24 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
private final Map<String, String> userMetadata;
private final ConcurrentMap<BlobStoreLocation, BlobStore> blobStores = new ConcurrentHashMap<>();
+ private OffloadSegmentInfoImpl segmentInfo;
+ private AtomicLong bufferLength = new AtomicLong(0);
+ private AtomicLong segmentLength = new AtomicLong(0);
+ final private long maxBufferLength;
+ final private ConcurrentLinkedQueue<Entry> offloadBuffer = new ConcurrentLinkedQueue<>();
+ private CompletableFuture<OffloadResult> offloadResult;
+ private volatile PositionImpl lastOfferedPosition = PositionImpl.latest;
+ private final Duration maxSegmentCloseTime;
+ private final long minSegmentCloseTimeMillis;
+ private final long segmentBeginTimeMillis;
+ private final long maxSegmentLength;
+ private final int streamingBlockSize;
+ private volatile ManagedLedger ml;
+ private OffloadIndexBlockV2Builder streamingIndexBuilder;
public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config,
- Map<String, String> userMetadata,
- OrderedScheduler scheduler) throws IOException {
+ Map<String, String> userMetadata,
+ OrderedScheduler scheduler) throws IOException {
return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata);
}
@@ -84,13 +116,20 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
this.scheduler = scheduler;
this.userMetadata = userMetadata;
this.config = config;
+ this.streamingBlockSize = config.getMinBlockSizeInBytes();
+ this.maxSegmentCloseTime = Duration.ofSeconds(config.getMaxSegmentTimeInSecond());
+ this.maxSegmentLength = config.getMaxSegmentSizeInBytes();
+ this.minSegmentCloseTimeMillis = Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis();
+ //ensure buffer can have enough content to fill a block
+ this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(), config.getMinBlockSizeInBytes());
+ this.segmentBeginTimeMillis = System.currentTimeMillis();
if (!Strings.isNullOrEmpty(config.getRegion())) {
this.writeLocation = new LocationBuilder()
- .scope(LocationScope.REGION)
- .id(config.getRegion())
- .description(config.getRegion())
- .build();
+ .scope(LocationScope.REGION)
+ .id(config.getRegion())
+ .description(config.getRegion())
+ .build();
} else {
this.writeLocation = null;
}
@@ -201,7 +240,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
// upload index block
try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build();
- OffloadIndexBlock.IndexInputStream indexStream = index.toStream()) {
+ IndexInputStream indexStream = index.toStream()) {
// write the index block
BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey);
DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
@@ -210,8 +249,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
indexPayload.getContentMetadata().setContentType("application/octet-stream");
Blob blob = blobBuilder
- .payload(indexPayload)
- .contentLength((long) indexStream.getStreamSize())
+ .payload(indexPayload)
+ .contentLength((long) indexStream.getStreamSize())
.build();
writeBlobStore.putBlob(config.getBucket(), blob);
@@ -230,6 +269,214 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(@NonNull ManagedLedger ml, UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String, String> driverMetadata) {
+ if (this.ml != null) {
+ log.error("streamingOffload should only be called once");
+ final CompletableFuture<OffloadHandle> result = new CompletableFuture<>();
+ result.completeExceptionally(new RuntimeException("streamingOffload should only be called once"));
+ }
+
+ this.ml = ml;
+ this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger, beginEntry, config.getDriver(),
+ driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = OffloadIndexBlockV2Builder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(), TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public Position lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public CompletableFuture<Position> lastOfferedAsync() {
+ return CompletableFuture.completedFuture(lastOffered());
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OfferEntryResult> offerEntryAsync(Entry entry) {
+ return CompletableFuture.completedFuture(offerEntry(entry));
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ buildIndexAndCompleteResult(dataObjectLength);
+ offloadResult.complete(segmentInfo.result());
+ } else if ((segmentInfo.isClosed() && !offloadBuffer.isEmpty())
+ // last time to build and upload block
+ || bufferLength.get() >= streamingBlockSize
+ // buffer size full, build and upload block
+ ) {
+ List<Entry> entries = new LinkedList<>();
+ int blockEntrySize = 0;
+ final Entry firstEntry = offloadBuffer.poll();
+ entries.add(firstEntry);
+ long blockLedgerId = firstEntry.getLedgerId();
+ long blockEntryId = firstEntry.getEntryId();
+
+ while (!offloadBuffer.isEmpty() && offloadBuffer.peek().getLedgerId() == blockLedgerId
+ && blockEntrySize <= streamingBlockSize) {
+ final Entry entryInBlock = offloadBuffer.poll();
+ final int entrySize = entryInBlock.getLength();
+ bufferLength.addAndGet(-entrySize);
+ blockEntrySize += entrySize;
+ entries.add(entryInBlock);
+ }
+ final int blockSize = BufferedOffloadStream
+ .calculateBlockSize(streamingBlockSize, entries.size(), blockEntrySize);
+ buildBlockAndUpload(blockSize, entries, blockLedgerId, blockEntryId, partId);
+ streamingOffloadLoop(partId + 1, dataObjectLength + blockSize);
+ } else {
+ log.debug("not enough data, delay schedule for part: {} length: {}", partId, dataObjectLength);
+ scheduler.chooseThread(segmentInfo)
+ .schedule(() -> {
+ streamingOffloadLoop(partId, dataObjectLength);
+ }, 100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void buildBlockAndUpload(int blockSize, List<Entry> entries, long blockLedgerId, long beginEntryId,
+ int partId) {
+ try (final BufferedOffloadStream payloadStream = new BufferedOffloadStream(blockSize, entries,
+ blockLedgerId, beginEntryId)) {
+ log.debug("begin upload payload: {} {}", blockLedgerId, beginEntryId);
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+ partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId, partPayload));
+ streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ streamingIndexBuilder.addBlock(blockLedgerId, beginEntryId, partId, blockSize);
+ final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ml.getLedgerInfo(blockLedgerId).get();
+ final MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder ledgerInfoBuilder = MLDataFormats.ManagedLedgerInfo.LedgerInfo
+ .newBuilder();
+ if (ledgerInfo != null) {
+ ledgerInfoBuilder.mergeFrom(ledgerInfo);
+ }
+ if (ledgerInfoBuilder.getEntries() == 0) {
+ //ledger unclosed, use last entry id of the block
+ ledgerInfoBuilder.setEntries(payloadStream.getEndEntryId() + 1);
+ }
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId, ledgerInfoBuilder.build());
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId, streamingMpu.id());
+ } catch (Throwable e) {
+ blobStore.abortMultipartUpload(streamingMpu);
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ }
+
+ private void buildIndexAndCompleteResult(long dataObjectLength) {
+ try {
+ blobStore.completeMultipartUpload(streamingMpu, streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength);
+ final OffloadIndexBlockV2 index = streamingIndexBuilder.buildV2();
+ final IndexInputStream indexStream = index.toStream();
+ final BlobBuilder indexBlobBuilder = blobStore.blobBuilder(streamingDataIndexKey);
+ streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ try (final InputStreamPayload indexPayLoad = Payloads.newInputStreamPayload(indexStream)) {
+ indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+ indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+ final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+ .contentLength(indexStream.getStreamSize())
+ .build();
+ blobStore.putBlob(config.getBucket(), indexBlob);
+
+ final OffloadResult result = segmentInfo.result();
+ offloadResult.complete(result);
+ log.debug("offload segment completed {}", result);
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ }
+
+ private CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return this.offloadResult;
+ }
+
+ private synchronized OfferEntryResult offerEntry(Entry entry) {
+
+ if (segmentInfo.isClosed()) {
+ log.debug("Segment already closed {}", segmentInfo);
+ return OfferEntryResult.FAIL_SEGMENT_CLOSED;
+ } else if (maxBufferLength <= bufferLength.get()) {
+ //buffer length can over fill maxBufferLength a bit with the last entry
+ //to prevent insufficient content to build a block
+ return OfferEntryResult.FAIL_BUFFER_FULL;
+ } else {
+ final EntryImpl entryImpl = EntryImpl
+ .create(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer());
+ offloadBuffer.add(entryImpl);
+ bufferLength.getAndAdd(entryImpl.getLength());
+ segmentLength.getAndAdd(entryImpl.getLength());
+ lastOfferedPosition = entryImpl.getPosition();
+ if (segmentLength.get() >= maxSegmentLength
+ && System.currentTimeMillis() - segmentBeginTimeMillis >= minSegmentCloseTimeMillis) {
+ closeSegment();
+ }
+ return OfferEntryResult.SUCCESS;
+ }
+ }
+
+ private synchronized boolean closeSegment() {
+ final boolean result = !segmentInfo.isClosed();
+ log.debug("close segment {} {}", lastOfferedPosition.getLedgerId(), lastOfferedPosition.getEntryId());
+ this.segmentInfo.closeSegment(lastOfferedPosition.getLedgerId(), lastOfferedPosition.getEntryId());
+ return result;
+ }
+
+ private PositionImpl lastOffered() {
+ return lastOfferedPosition;
+ }
+
/**
* Attempts to create a BlobStoreLocation from the values in the offloadDriverMetadata,
* however, if no values are available, it defaults to the currently configured
@@ -255,17 +502,50 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
scheduler.chooseThread(ledgerId).submit(() -> {
- try {
- promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
- readBlobstore,
- readBucket, key, indexKey,
- DataBlockUtils.VERSION_CHECK,
- ledgerId, config.getReadBufferSizeInBytes()));
- } catch (Throwable t) {
- log.error("Failed readOffloaded: ", t);
- promise.completeExceptionally(t);
- }
- });
+ try {
+ promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+ readBlobstore,
+ readBucket, key, indexKey,
+ DataBlockUtils.VERSION_CHECK,
+ ledgerId, config.getReadBufferSizeInBytes()));
+ } catch (Throwable t) {
+ log.error("Failed readOffloaded: ", t);
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
+ Map<String, String> offloadDriverMetadata) {
+ BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
+ String readBucket = bsKey.getBucket();
+ BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
+ CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
+ final List<MLDataFormats.OffloadSegment> offloadSegmentList = ledgerContext.getOffloadSegmentList();
+ List<String> keys = Lists.newLinkedList();
+ List<String> indexKeys = Lists.newLinkedList();
+ offloadSegmentList.forEach(seg -> {
+ final UUID uuid = new UUID(seg.getUidMsb(), seg.getUidLsb());
+ final String key = uuid.toString();
+ final String indexKey = DataBlockUtils.indexBlockOffloadKey(uuid);
+ keys.add(key);
+ indexKeys.add(indexKey);
+ });
+
+ scheduler.chooseThread(ledgerId).submit(() -> {
+ try {
+ promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
+ readBlobstore,
+ readBucket, keys, indexKeys,
+ DataBlockUtils.VERSION_CHECK,
+ ledgerId, config.getReadBufferSizeInBytes()));
+ } catch (Throwable t) {
+ log.error("Failed readOffloaded: ", t);
+ promise.completeExceptionally(t);
+ }
+ });
return promise;
}
@@ -292,6 +572,27 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
return promise;
}
+ @Override
+ public CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, String> offloadDriverMetadata) {
+ BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
+ String readBucket = bsKey.getBucket(offloadDriverMetadata);
+ BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
+
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ scheduler.submit(() -> {
+ try {
+ readBlobstore.removeBlobs(readBucket,
+ ImmutableList.of(uid.toString(),
+ DataBlockUtils.indexBlockOffloadKey(uid)));
+ promise.complete(null);
+ } catch (Throwable t) {
+ log.error("Failed delete Blob", t);
+ promise.completeExceptionally(t);
+ }
+ });
+
+ return promise;
+ }
@Override
public OffloadPolicies getOffloadPolicies() {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index dcc693a..a4ffdea 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import static com.google.common.base.Preconditions.checkState;
-
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
@@ -44,7 +43,7 @@ import org.slf4j.LoggerFactory;
public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStream {
private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
- private static final int[] BLOCK_END_PADDING = new int[] { 0xFE, 0xDC, 0xDE, 0xAD };
+ static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD };
private final ReadHandle ledger;
private final long startEntryId;
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java
new file mode 100644
index 0000000..3a323ce
--- /dev/null
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+@Slf4j
+public class BufferedOffloadStream extends InputStream {
+ static final int[] BLOCK_END_PADDING = BlockAwareSegmentInputStreamImpl.BLOCK_END_PADDING;
+
+ private final long ledgerId;
+ private final long beginEntryId;
+
+ public BufferedOffloadStream(int blockSize, List<Entry> entries, long ledgerId, long beginEntryId) {
+ this.ledgerId = ledgerId;
+ this.beginEntryId = beginEntryId;
+ this.endEntryId = beginEntryId;
+ this.blockSize = blockSize;
+ this.entryBuffer = entries;
+ this.blockHead = StreamingDataBlockHeaderImpl.of(blockSize, ledgerId, beginEntryId)
+ .toStream();
+ }
+
+ public long getEndEntryId() {
+ return endEntryId;
+ }
+
+ private volatile long endEntryId;
+ static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
+ private final long blockSize;
+ private final List<Entry> entryBuffer;
+ private final InputStream blockHead;
+ int offset = 0;
+ static final int NOT_INITIALIZED = -1;
+ int validDataOffset = NOT_INITIALIZED;
+ CompositeByteBuf currentEntry;
+
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ public long getBeginEntryId() {
+ return beginEntryId;
+ }
+
+ public long getBlockSize() {
+ return blockSize;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (blockHead.available() > 0) {
+ offset++;
+ return blockHead.read();
+ }
+ //if current exists, use current first
+ if (currentEntry != null) {
+ if (currentEntry.readableBytes() > 0) {
+ offset += 1;
+ return currentEntry.readUnsignedByte();
+ } else {
+ currentEntry.release();
+ currentEntry = null;
+ }
+ }
+
+ if (blockSize <= offset) {
+ return -1;
+ } else if (validDataOffset != NOT_INITIALIZED) {
+ return BLOCK_END_PADDING[(offset++ - validDataOffset) % BLOCK_END_PADDING.length];
+ }
+
+
+ if (entryBuffer.isEmpty()) {
+ validDataOffset = offset;
+ return read();
+ }
+
+ Entry headEntry = entryBuffer.remove(0);
+
+ //create new block when a ledger end
+ if (headEntry.getLedgerId() != this.ledgerId) {
+ throw new RuntimeException(
+ String.format("there should not be multi ledger in a block %s %s", headEntry.getLedgerId(),
+ this.ledgerId));
+ }
+
+ final int entryLength = headEntry.getLength();
+ final long entryId = headEntry.getEntryId();
+ CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
+ ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
+ entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
+ entryBuf.addComponents(true, entryHeaderBuf, headEntry.getDataBuffer().retain());
+ endEntryId = headEntry.getEntryId();
+ headEntry.release();
+ currentEntry = entryBuf;
+ return read();
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ blockHead.close();
+ }
+
+ public static int calculateBlockSize(int streamingBlockSize, int entryCount, int entrySize) {
+ int validDataSize = (entryCount * ENTRY_HEADER_SIZE
+ + entrySize
+ + StreamingDataBlockHeaderImpl.getDataStartOffset());
+ return Math.max(streamingBlockSize, validDataSize);
+ }
+}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
index 9239ec2..cd46c52 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
@@ -19,10 +19,8 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import com.google.common.io.CountingInputStream;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.PooledByteBufAllocator;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
@@ -31,7 +29,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
/**
- * The data block header in code storage for each data block.
+ * The data block header in tiered storage for each data block.
*/
public class DataBlockHeaderImpl implements DataBlockHeader {
// Magic Word for data block.
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockUtils.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockUtils.java
index abbb032..440221f 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockUtils.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockUtils.java
@@ -19,11 +19,9 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import com.google.common.collect.ImmutableMap;
-
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
-
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobBuilder;
@@ -59,6 +57,10 @@ public class DataBlockUtils {
return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
}
+ public static String indexBlockOffloadKey(UUID uuid) {
+ return String.format("%s-index", uuid.toString());
+ }
+
public static void addVersionInfo(BlobBuilder blobBuilder, Map<String, String> userMetadata) {
ImmutableMap.Builder<String, String> metadataBuilder = ImmutableMap.builder();
metadataBuilder.putAll(userMetadata);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockBuilderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockBuilderImpl.java
deleted file mode 100644
index 42f4567..0000000
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockBuilderImpl.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bookkeeper.mledger.offload.jcloud.impl;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import org.apache.bookkeeper.client.api.LedgerMetadata;
-import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
-import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
-
-/**
- * Interface for builder of index block used for offload a ledger to long term storage.
- */
-public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
-
- private LedgerMetadata ledgerMetadata;
- private long dataObjectLength;
- private long dataHeaderLength;
- private List<OffloadIndexEntryImpl> entries;
- private int lastBlockSize;
-
- public OffloadIndexBlockBuilderImpl() {
- this.entries = Lists.newArrayList();
- }
-
- @Override
- public OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength) {
- this.dataObjectLength = dataObjectLength;
- return this;
- }
-
- @Override
- public OffloadIndexBlockBuilder withDataBlockHeaderLength(long dataHeaderLength) {
- this.dataHeaderLength = dataHeaderLength;
- return this;
- }
-
- @Override
- public OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata) {
- this.ledgerMetadata = metadata;
- return this;
- }
-
- @Override
- public OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize) {
- checkState(dataHeaderLength > 0);
-
- // we should added one by one.
- long offset;
- if (firstEntryId == 0) {
- checkState(entries.size() == 0);
- offset = 0;
- } else {
- checkState(entries.size() > 0);
- offset = entries.get(entries.size() - 1).getOffset() + lastBlockSize;
- }
- lastBlockSize = blockSize;
-
- this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, offset, dataHeaderLength));
- return this;
- }
-
- @Override
- public OffloadIndexBlock fromStream(InputStream is) throws IOException {
- return OffloadIndexBlockImpl.get(is);
- }
-
- @Override
- public OffloadIndexBlock build() {
- checkState(ledgerMetadata != null);
- checkState(!entries.isEmpty());
- checkState(dataObjectLength > 0);
- checkState(dataHeaderLength > 0);
- return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, dataHeaderLength, entries);
- }
-
-}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 864a942..e525871 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -27,7 +27,6 @@ import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -80,7 +79,11 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
return block;
}
- public static OffloadIndexBlockImpl get(InputStream stream) throws IOException {
+ public static OffloadIndexBlockImpl get(int magic, DataInputStream stream) throws IOException {
+ if (magic != INDEX_MAGIC_WORD) {
+ throw new IOException(String.format("Invalid MagicWord. read: 0x%x expected: 0x%x",
+ magic, INDEX_MAGIC_WORD));
+ }
OffloadIndexBlockImpl block = RECYCLER.get();
block.indexEntries = Maps.newTreeMap();
block.fromStream(stream);
@@ -164,7 +167,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
// write entries
this.indexEntries.entrySet().forEach(entry ->
- out.writeLong(entry.getValue().getEntryId())
+ out.writeLong(entry.getValue().getEntryId())
.writeInt(entry.getValue().getPartId())
.writeLong(entry.getValue().getOffset()));
@@ -198,7 +201,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
ledgerMetadataFormat.getCustomMetadataList().forEach(
- entry -> this.customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
+ entry -> this.customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
}
ledgerMetadataFormat.getSegmentList().forEach(segment -> {
@@ -327,13 +330,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
return new InternalLedgerMetadata(builder.build());
}
- private OffloadIndexBlock fromStream(InputStream stream) throws IOException {
- DataInputStream dis = new DataInputStream(stream);
- int magic = dis.readInt();
- if (magic != this.INDEX_MAGIC_WORD) {
- throw new IOException(String.format("Invalid MagicWord. read: 0x%x expected: 0x%x",
- magic, INDEX_MAGIC_WORD));
- }
+ private OffloadIndexBlock fromStream(DataInputStream dis) throws IOException {
dis.readInt(); // no used index block length
this.dataObjectLength = dis.readLong();
this.dataHeaderLength = dis.readLong();
@@ -351,9 +348,8 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
for (int i = 0; i < indexEntryCount; i++) {
long entryId = dis.readLong();
this.indexEntries.putIfAbsent(entryId, OffloadIndexEntryImpl.of(entryId, dis.readInt(),
- dis.readLong(), dataHeaderLength));
+ dis.readLong(), dataHeaderLength));
}
-
return this;
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2BuilderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2BuilderImpl.java
new file mode 100644
index 0000000..fa8bc14
--- /dev/null
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2BuilderImpl.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.collect.Lists;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+
+/**
+ * Interface for builder of index block used for offload a ledger to long term storage.
+ */
+public class OffloadIndexBlockV2BuilderImpl implements OffloadIndexBlockBuilder, OffloadIndexBlockV2Builder {
+
+ private final Map<Long, LedgerInfo> ledgerMetadataMap;
+ private LedgerMetadata ledgerMetadata;
+ private long dataObjectLength;
+ private long dataHeaderLength;
+ private List<OffloadIndexEntryImpl> entries;
+ private int lastBlockSize;
+ private int lastStreamingBlockSize;
+ private long streamingOffset = 0;
+ private final SortedMap<Long, List<OffloadIndexEntryImpl>> entryMap = new TreeMap<>();
+
+
+ public OffloadIndexBlockV2BuilderImpl() {
+ this.entries = Lists.newArrayList();
+ this.ledgerMetadataMap = new HashMap<>();
+ }
+
+ @Override
+ public OffloadIndexBlockV2BuilderImpl withDataObjectLength(long dataObjectLength) {
+ this.dataObjectLength = dataObjectLength;
+ return this;
+ }
+
+ @Override
+ public OffloadIndexBlockV2BuilderImpl withDataBlockHeaderLength(long dataHeaderLength) {
+ this.dataHeaderLength = dataHeaderLength;
+ return this;
+ }
+
+ @Override
+ public OffloadIndexBlockV2BuilderImpl withLedgerMetadata(LedgerMetadata metadata) {
+ this.ledgerMetadata = metadata;
+ return this;
+ }
+
+ @Override
+ public OffloadIndexBlockV2BuilderImpl addLedgerMeta(Long ledgerId, LedgerInfo metadata) {
+ this.ledgerMetadataMap.put(ledgerId, metadata);
+ return this;
+ }
+
+ @Override
+ public OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize) {
+ checkState(dataHeaderLength > 0);
+
+ // we should added one by one.
+ long offset;
+ if (firstEntryId == 0) {
+ checkState(entries.size() == 0);
+ offset = 0;
+ } else {
+ checkState(entries.size() > 0);
+ offset = entries.get(entries.size() - 1).getOffset() + lastBlockSize;
+ }
+ lastBlockSize = blockSize;
+
+ this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, offset, dataHeaderLength));
+ return this;
+ }
+
+ @Override
+ public OffloadIndexBlockV2Builder addBlock(long ledgerId, long firstEntryId, int partId, int blockSize) {
+ checkState(dataHeaderLength > 0);
+
+ streamingOffset = streamingOffset + lastStreamingBlockSize;
+ lastStreamingBlockSize = blockSize;
+
+ final List<OffloadIndexEntryImpl> list = entryMap.getOrDefault(ledgerId, new LinkedList<>());
+ list.add(OffloadIndexEntryImpl.of(firstEntryId, partId, streamingOffset, dataHeaderLength));
+ entryMap.put(ledgerId, list);
+ return this;
+ }
+
+ @Override
+ public OffloadIndexBlockV2 fromStream(InputStream is) throws IOException {
+ final DataInputStream dataInputStream = new DataInputStream(is);
+ final int magic = dataInputStream.readInt();
+ if (magic == OffloadIndexBlockImpl.getIndexMagicWord()) {
+ return OffloadIndexBlockImpl.get(magic, dataInputStream);
+ } else if (magic == OffloadIndexBlockV2Impl.getIndexMagicWord()) {
+ return OffloadIndexBlockV2Impl.get(magic, dataInputStream);
+ } else {
+ throw new IOException(String.format("Invalid MagicWord. read: 0x%x expected: 0x%x or 0x%x",
+ magic, OffloadIndexBlockImpl.getIndexMagicWord(),
+ OffloadIndexBlockV2Impl.getIndexMagicWord()));
+ }
+ }
+
+ @Override
+ public OffloadIndexBlock build() {
+ checkState(ledgerMetadata != null);
+ checkState(!entries.isEmpty());
+ checkState(dataObjectLength > 0);
+ checkState(dataHeaderLength > 0);
+ return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, dataHeaderLength, entries);
+ }
+
+ @Override
+ public OffloadIndexBlockV2 buildV2() {
+ checkState(!ledgerMetadataMap.isEmpty());
+ checkState(true);
+ checkState(!entryMap.isEmpty());
+ checkState(dataObjectLength > 0);
+ checkState(dataHeaderLength > 0);
+ return OffloadIndexBlockV2Impl.get(ledgerMetadataMap, dataObjectLength, dataHeaderLength, entryMap);
+ }
+
+}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2Impl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2Impl.java
new file mode 100644
index 0000000..d58a4c1
--- /dev/null
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2Impl.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock.IndexInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OffloadIndexBlockV2Impl implements OffloadIndexBlockV2 {
+ private static final Logger log = LoggerFactory.getLogger(OffloadIndexBlockImpl.class);
+
+ private static final int INDEX_MAGIC_WORD = 0x3D1FB0BC;
+
+ private Map<Long, LedgerInfo> segmentMetadata;
+ final private Map<Long, LedgerMetadata> compatibleMetadata = Maps.newTreeMap();
+ private long dataObjectLength;
+ private long dataHeaderLength;
+ // private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
+ private Map<Long, TreeMap<Long, OffloadIndexEntryImpl>> indexEntries;
+
+
+ private final Handle<OffloadIndexBlockV2Impl> recyclerHandle;
+
+ private static final Recycler<OffloadIndexBlockV2Impl> RECYCLER = new Recycler<OffloadIndexBlockV2Impl>() {
+ @Override
+ protected OffloadIndexBlockV2Impl newObject(Handle<OffloadIndexBlockV2Impl> handle) {
+ return new OffloadIndexBlockV2Impl(handle);
+ }
+ };
+
+ private OffloadIndexBlockV2Impl(Handle<OffloadIndexBlockV2Impl> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ public static OffloadIndexBlockV2Impl get(Map<Long, LedgerInfo> metadata, long dataObjectLength,
+ long dataHeaderLength,
+ Map<Long, List<OffloadIndexEntryImpl>> entries) {
+ OffloadIndexBlockV2Impl block = RECYCLER.get();
+ block.indexEntries = new HashMap<>();
+ entries.forEach((ledgerId, list) -> {
+ final TreeMap<Long, OffloadIndexEntryImpl> inLedger = block.indexEntries
+ .getOrDefault(ledgerId, new TreeMap<>());
+ list.forEach(indexEntry -> {
+ inLedger.put(indexEntry.getEntryId(), indexEntry);
+ });
+ block.indexEntries.put(ledgerId, inLedger);
+ });
+
+ block.segmentMetadata = metadata;
+ block.dataObjectLength = dataObjectLength;
+ block.dataHeaderLength = dataHeaderLength;
+ return block;
+ }
+
+ public static OffloadIndexBlockV2Impl get(int magic, DataInputStream stream) throws IOException {
+ OffloadIndexBlockV2Impl block = RECYCLER.get();
+ block.indexEntries = Maps.newTreeMap();
+ block.segmentMetadata = Maps.newTreeMap();
+ if (magic != INDEX_MAGIC_WORD) {
+ throw new IOException(String.format("Invalid MagicWord. read: 0x%x expected: 0x%x",
+ magic, INDEX_MAGIC_WORD));
+ }
+ block.fromStream(stream);
+ return block;
+ }
+
+ public void recycle() {
+ dataObjectLength = -1;
+ dataHeaderLength = -1;
+ segmentMetadata = null;
+ indexEntries.clear();
+ indexEntries = null;
+ if (recyclerHandle != null) {
+ recyclerHandle.recycle(this);
+ }
+ }
+
+ @Override
+ public OffloadIndexEntry getIndexEntryForEntry(long ledgerId, long messageEntryId) throws IOException {
+ if (messageEntryId > getLedgerMetadata(ledgerId).getLastEntryId()) {
+ log.warn("Try to get entry: {}, which beyond lastEntryId {}, return null",
+ messageEntryId, getLedgerMetadata(ledgerId).getLastEntryId());
+ throw new IndexOutOfBoundsException("Entry index: " + messageEntryId
+ + " beyond lastEntryId: " + getLedgerMetadata(ledgerId).getLastEntryId());
+ }
+ // find the greatest mapping Id whose entryId <= messageEntryId
+ return this.indexEntries.get(ledgerId).floorEntry(messageEntryId).getValue();
+ }
+
+ public long getStartEntryId(long ledgerId) {
+ return this.indexEntries.get(ledgerId).firstEntry().getValue().getEntryId();
+ }
+
+ @Override
+ public int getEntryCount() {
+ int ans = 0;
+ for (TreeMap<Long, OffloadIndexEntryImpl> v : this.indexEntries.values()) {
+ ans += v.size();
+ }
+
+ return ans;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata(long ledgerId) {
+ if (compatibleMetadata.containsKey(ledgerId)) {
+ return compatibleMetadata.get(ledgerId);
+ } else if (segmentMetadata.containsKey(ledgerId)) {
+ final CompatibleMetadata result = new CompatibleMetadata(segmentMetadata.get(ledgerId));
+ compatibleMetadata.put(ledgerId, result);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public long getDataObjectLength() {
+ return this.dataObjectLength;
+ }
+
+ @Override
+ public long getDataBlockHeaderLength() {
+ return this.dataHeaderLength;
+ }
+
+ /**
+ * Get the content of the index block as InputStream.
+ * Read out in format:
+ * | index_magic_header | index_block_len | data_object_len | data_header_len |
+ * | index_entry_count | segment_metadata_len | segment metadata | index entries... |
+ */
+ @Override
+ public IndexInputStream toStream() throws IOException {
+
+ int indexBlockLength = 4 /* magic header */
+ + 4 /* index block length */
+ + 8 /* data object length */
+ + 8;/* data header length */
+
+ Map<Long, byte[]> metaBytesMap = new HashMap<>();
+ for (Map.Entry<Long, TreeMap<Long, OffloadIndexEntryImpl>> e : this.indexEntries.entrySet()) {
+ Long ledgerId = e.getKey();
+ TreeMap<Long, OffloadIndexEntryImpl> ledgerIndexEntries = e.getValue();
+ int indexEntryCount = ledgerIndexEntries.size();
+ byte[] ledgerMetadataByte = this.segmentMetadata.get(ledgerId).toByteArray();
+ int segmentMetadataLength = ledgerMetadataByte.length;
+ indexBlockLength += 8 /* ledger id length */
+ + 4 /* index entry count */
+ + 4 /* segment metadata length */
+ + segmentMetadataLength
+ + indexEntryCount * (8 + 4 + 8);
+ metaBytesMap.put(ledgerId, ledgerMetadataByte);
+ }
+
+ ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
+
+ out.writeInt(INDEX_MAGIC_WORD)
+ .writeInt(indexBlockLength)
+ .writeLong(dataObjectLength)
+ .writeLong(dataHeaderLength);
+
+ for (Map.Entry<Long, TreeMap<Long, OffloadIndexEntryImpl>> e : this.indexEntries.entrySet()) {
+ Long ledgerId = e.getKey();
+ TreeMap<Long, OffloadIndexEntryImpl> ledgerIndexEntries = e.getValue();
+ int indexEntryCount = ledgerIndexEntries.size();
+ byte[] ledgerMetadataByte = metaBytesMap.get(ledgerId);
+ out.writeLong(ledgerId)
+ .writeInt(indexEntryCount)
+ .writeInt(ledgerMetadataByte.length)
+ .writeBytes(ledgerMetadataByte);
+ ledgerIndexEntries.values().forEach(idxEntry -> {
+ out.writeLong(idxEntry.getEntryId())
+ .writeInt(idxEntry.getPartId())
+ .writeLong(idxEntry.getOffset());
+ });
+ }
+
+ return new IndexInputStream(new ByteBufInputStream(out, true), indexBlockLength);
+ }
+
+ private static LedgerInfo parseLedgerInfo(byte[] bytes) throws IOException {
+ return LedgerInfo.newBuilder().mergeFrom(bytes).build();
+ }
+
+ private OffloadIndexBlockV2 fromStream(DataInputStream dis) throws IOException {
+
+ dis.readInt(); // no used index block length
+ this.dataObjectLength = dis.readLong();
+ this.dataHeaderLength = dis.readLong();
+ while (dis.available() > 0) {
+ long ledgerId = dis.readLong();
+ int indexEntryCount = dis.readInt();
+ int segmentMetadataLength = dis.readInt();
+
+ byte[] metadataBytes = new byte[segmentMetadataLength];
+
+ if (segmentMetadataLength != dis.read(metadataBytes)) {
+ log.error("Read ledgerMetadata from bytes failed");
+ throw new IOException("Read ledgerMetadata from bytes failed");
+ }
+ final LedgerInfo ledgerInfo = parseLedgerInfo(metadataBytes);
+ this.segmentMetadata.put(ledgerId, ledgerInfo);
+ final TreeMap<Long, OffloadIndexEntryImpl> indexEntries = new TreeMap<>();
+
+ for (int i = 0; i < indexEntryCount; i++) {
+ long entryId = dis.readLong();
+ indexEntries.putIfAbsent(entryId, OffloadIndexEntryImpl.of(entryId, dis.readInt(),
+ dis.readLong(), dataHeaderLength));
+ }
+ this.indexEntries.put(ledgerId, indexEntries);
+ }
+
+ return this;
+ }
+
+ public static int getIndexMagicWord() {
+ return INDEX_MAGIC_WORD;
+ }
+
+ @Override
+ public void close() {
+ recycle();
+ }
+
+ @VisibleForTesting
+ static class CompatibleMetadata implements LedgerMetadata {
+ LedgerInfo ledgerInfo;
+
+ public CompatibleMetadata(LedgerInfo ledgerInfo) {
+ this.ledgerInfo = ledgerInfo;
+ }
+
+ @Override
+ public long getLedgerId() {
+ return ledgerInfo.getLedgerId();
+ }
+
+ @Override
+ public int getEnsembleSize() {
+ return 0;
+ }
+
+ @Override
+ public int getWriteQuorumSize() {
+ return 0;
+ }
+
+ @Override
+ public int getAckQuorumSize() {
+ return 0;
+ }
+
+ @Override
+ public long getLastEntryId() {
+ return ledgerInfo.getEntries() - 1;
+ }
+
+ @Override
+ public long getLength() {
+ return ledgerInfo.getSize();
+ }
+
+ @Override
+ public boolean hasPassword() {
+ return false;
+ }
+
+ @Override
+ public byte[] getPassword() {
+ return new byte[0];
+ }
+
+ @Override
+ public DigestType getDigestType() {
+ return null;
+ }
+
+ @Override
+ public long getCtime() {
+ return 0;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return true;
+ }
+
+ @Override
+ public Map<String, byte[]> getCustomMetadata() {
+ return null;
+ }
+
+ @Override
+ public List<BookieId> getEnsembleAt(long entryId) {
+ return null;
+ }
+
+ @Override
+ public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
+ return null;
+ }
+
+ @Override
+ public State getState() {
+ return null;
+ }
+
+ @Override
+ public String toSafeString() {
+ return null;
+ }
+
+ @Override
+ public int getMetadataFormatVersion() {
+ return 0;
+ }
+
+ @Override
+ public long getCToken() {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CompatibleMetadata that = (CompatibleMetadata) o;
+ return ledgerInfo.equals(that.ledgerInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ledgerInfo);
+ }
+ }
+}
+
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
index 10408fe..0a5ebb1 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
@@ -37,14 +37,17 @@ public class OffloadIndexEntryImpl implements OffloadIndexEntry {
public long getEntryId() {
return entryId;
}
+
@Override
public int getPartId() {
return partId;
}
+
@Override
public long getOffset() {
return offset;
}
+
@Override
public long getDataOffset() {
return offset + blockHeaderSize;
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java
similarity index 71%
copy from tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
copy to tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java
index 9239ec2..ed383a6 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java
@@ -19,10 +19,8 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import com.google.common.io.CountingInputStream;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.PooledByteBufAllocator;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
@@ -31,44 +29,30 @@ import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
/**
- * The data block header in code storage for each data block.
+ * The data block header in tiered storage for each data block.
*/
-public class DataBlockHeaderImpl implements DataBlockHeader {
- // Magic Word for data block.
+public class StreamingDataBlockHeaderImpl implements DataBlockHeader {
+ // Magic Word for streaming data block.
// It is a sequence of bytes used to identify the start of a block.
- static final int MAGIC_WORD = 0xFBDBABCB;
+ static final int MAGIC_WORD = 0x26A66D32;
// This is bigger than header size. Leaving some place for alignment and future enhancement.
// Payload use this as the start offset.
- private static final int HEADER_MAX_SIZE = 128;
+ public static final int HEADER_MAX_SIZE = 128;
private static final int HEADER_BYTES_USED = 4 /* magic */
- + 8 /* header len */
- + 8 /* block len */
- + 8 /* first entry id */;
+ + 8 /* header len */
+ + 8 /* block len */
+ + 8 /* first entry id */
+ + 8 /* ledger id */;
private static final byte[] PADDING = new byte[HEADER_MAX_SIZE - HEADER_BYTES_USED];
- public static DataBlockHeaderImpl of(int blockLength, long firstEntryId) {
- return new DataBlockHeaderImpl(HEADER_MAX_SIZE, blockLength, firstEntryId);
+ public long getLedgerId() {
+ return ledgerId;
}
- // Construct DataBlockHeader from InputStream, which contains `HEADER_MAX_SIZE` bytes readable.
- public static DataBlockHeader fromStream(InputStream stream) throws IOException {
- CountingInputStream countingStream = new CountingInputStream(stream);
- DataInputStream dis = new DataInputStream(countingStream);
- int magic = dis.readInt();
- if (magic != MAGIC_WORD) {
- throw new IOException("Data block header magic word not match. read: " + magic
- + " expected: " + MAGIC_WORD);
- }
-
- long headerLen = dis.readLong();
- long blockLen = dis.readLong();
- long firstEntryId = dis.readLong();
- long toSkip = headerLen - countingStream.getCount();
- if (dis.skip(toSkip) != toSkip) {
- throw new EOFException("Header was too small");
- }
+ private final long ledgerId;
- return new DataBlockHeaderImpl(headerLen, blockLen, firstEntryId);
+ public static StreamingDataBlockHeaderImpl of(int blockLength, long ledgerId, long firstEntryId) {
+ return new StreamingDataBlockHeaderImpl(HEADER_MAX_SIZE, blockLength, ledgerId, firstEntryId);
}
private final long headerLength;
@@ -98,10 +82,33 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
return this.firstEntryId;
}
- public DataBlockHeaderImpl(long headerLength, long blockLength, long firstEntryId) {
+ public StreamingDataBlockHeaderImpl(long headerLength, long blockLength, long ledgerId, long firstEntryId) {
this.headerLength = headerLength;
this.blockLength = blockLength;
this.firstEntryId = firstEntryId;
+ this.ledgerId = ledgerId;
+ }
+
+ // Construct DataBlockHeader from InputStream, which contains `HEADER_MAX_SIZE` bytes readable.
+ public static StreamingDataBlockHeaderImpl fromStream(InputStream stream) throws IOException {
+ CountingInputStream countingStream = new CountingInputStream(stream);
+ DataInputStream dis = new DataInputStream(countingStream);
+ int magic = dis.readInt();
+ if (magic != MAGIC_WORD) {
+ throw new IOException("Data block header magic word not match. read: " + magic
+ + " expected: " + MAGIC_WORD);
+ }
+
+ long headerLen = dis.readLong();
+ long blockLen = dis.readLong();
+ long firstEntryId = dis.readLong();
+ long ledgerId = dis.readLong();
+ long toSkip = headerLen - countingStream.getCount();
+ if (dis.skip(toSkip) != toSkip) {
+ throw new EOFException("Header was too small");
+ }
+
+ return new StreamingDataBlockHeaderImpl(headerLen, blockLen, ledgerId, firstEntryId);
}
/**
@@ -113,10 +120,11 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
public InputStream toStream() {
ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
out.writeInt(MAGIC_WORD)
- .writeLong(headerLength)
- .writeLong(blockLength)
- .writeLong(firstEntryId)
- .writeBytes(PADDING);
+ .writeLong(headerLength)
+ .writeLong(blockLength)
+ .writeLong(firstEntryId)
+ .writeLong(ledgerId)
+ .writeBytes(PADDING);
// true means the input stream will release the ByteBuf on close
return new ByteBufInputStream(out, true);
@@ -124,8 +132,8 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
@Override
public String toString() {
- return String.format("DataBlockHeader(len:%d,hlen:%d,firstEntry:%d)",
- blockLength, headerLength, firstEntryId);
+ return String.format("StreamingDataBlockHeader(len:%d,hlen:%d,firstEntry:%d,ledger:%d)",
+ blockLength, headerLength, firstEntryId, ledgerId);
}
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index 7d7d7fa..5aa5765 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -61,8 +61,16 @@ public class TieredStorageConfiguration {
public static final String METADATA_FIELD_REGION = "region";
public static final String METADATA_FIELD_ENDPOINT = "serviceEndpoint";
public static final String METADATA_FIELD_MAX_BLOCK_SIZE = "maxBlockSizeInBytes";
+ public static final String METADATA_FIELD_MIN_BLOCK_SIZE = "minBlockSizeInBytes";
public static final String METADATA_FIELD_READ_BUFFER_SIZE = "readBufferSizeInBytes";
+ public static final String METADATA_FIELD_WRITE_BUFFER_SIZE = "writeBufferSizeInBytes";
public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload";
+ public static final String MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC = "maxOffloadSegmentRolloverTimeInSeconds";
+ public static final String MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC = "minOffloadSegmentRolloverTimeInSeconds";
+ public static final long DEFAULT_MAX_SEGMENT_TIME_IN_SECOND = 600;
+ public static final long DEFAULT_MIN_SEGMENT_TIME_IN_SECOND = 0;
+ public static final String MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES = "maxOffloadSegmentSizeInBytes";
+ public static final long DEFAULT_MAX_SEGMENT_SIZE_IN_BYTES = 1024 * 1024 * 1024;
protected static final int MB = 1024 * 1024;
@@ -177,6 +185,30 @@ public class TieredStorageConfiguration {
return null;
}
+ public long getMaxSegmentTimeInSecond() {
+ if (configProperties.containsKey(MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC)) {
+ return Long.parseLong(configProperties.get(MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC));
+ } else {
+ return DEFAULT_MAX_SEGMENT_TIME_IN_SECOND;
+ }
+ }
+
+ public long getMinSegmentTimeInSecond() {
+ if (configProperties.containsKey(MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC)) {
+ return Long.parseLong(configProperties.get(MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC));
+ } else {
+ return DEFAULT_MIN_SEGMENT_TIME_IN_SECOND;
+ }
+ }
+
+ public long getMaxSegmentSizeInBytes() {
+ if (configProperties.containsKey(MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES)) {
+ return Long.parseLong(configProperties.get(MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES));
+ } else {
+ return DEFAULT_MAX_SEGMENT_SIZE_IN_BYTES;
+ }
+ }
+
public void setServiceEndpoint(String s) {
configProperties.put(getKeyName(METADATA_FIELD_ENDPOINT), s);
}
@@ -213,6 +245,15 @@ public class TieredStorageConfiguration {
return new Integer(64 * MB);
}
+ public Integer getMinBlockSizeInBytes() {
+ for (String key : getKeys(METADATA_FIELD_MIN_BLOCK_SIZE)) {
+ if (configProperties.containsKey(key)) {
+ return Integer.valueOf(configProperties.get(key));
+ }
+ }
+ return 5 * MB;
+ }
+
public Integer getReadBufferSizeInBytes() {
for (String key : getKeys(METADATA_FIELD_READ_BUFFER_SIZE)) {
if (configProperties.containsKey(key)) {
@@ -222,6 +263,15 @@ public class TieredStorageConfiguration {
return new Integer(MB);
}
+ public Integer getWriteBufferSizeInBytes() {
+ for (String key : getKeys(METADATA_FIELD_WRITE_BUFFER_SIZE)) {
+ if (configProperties.containsKey(key)) {
+ return Integer.valueOf(configProperties.get(key));
+ }
+ }
+ return 10 * MB;
+ }
+
public Supplier<Credentials> getProviderCredentials() {
if (credentials == null) {
getProvider().buildCredentials(this);
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
index a2944f5..a646999 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
@@ -19,21 +19,17 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.lang.reflect.Method;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.api.DigestType;
-import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockHeaderImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.bookkeeper.util.ZkUtils;
@@ -42,29 +38,22 @@ import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.data.ACL;
-
import org.jclouds.blobstore.BlobStore;
-import org.jclouds.blobstore.domain.Blob;
import org.jclouds.domain.Credentials;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import com.google.common.util.concurrent.MoreExecutors;
-
public abstract class BlobStoreManagedLedgerOffloaderBase {
public final static String BUCKET = "pulsar-unittest";
protected static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
protected static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
-
+
protected final OrderedScheduler scheduler;
protected final PulsarMockBookKeeper bk;
protected final JCloudBlobStoreProvider provider;
protected TieredStorageConfiguration config;
protected BlobStore blobStore = null;
-
+
protected BlobStoreManagedLedgerOffloaderBase() throws Exception {
scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(5).name("offloader").build();
bk = new PulsarMockBookKeeper(createMockZooKeeper(), scheduler.chooseThread(this));
@@ -82,7 +71,11 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
CreateMode.PERSISTENT);
return zk;
}
-
+
+ protected static MockManagedLedger createMockManagedLedger() {
+ return new MockManagedLedger();
+ }
+
/*
* Determine which BlobStore Provider to test based on the System properties
*/
@@ -95,7 +88,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
return JCloudBlobStoreProvider.TRANSIENT;
}
}
-
+
/*
* Get the credentials to use for the JCloud provider
* based on the System properties.
@@ -108,7 +101,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
* props.setProperty("S3Key", "HXXXXXß");
*/
return () -> new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));
-
+
} else if (Boolean.parseBoolean(System.getProperty("testRealGCS", "false"))) {
/*
* To use this, must config credentials using "client_email" as GCSID and "private_key" as GCSKey.
@@ -121,17 +114,24 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
return null;
}
}
-
+
protected TieredStorageConfiguration getConfiguration(String bucket) {
- Map<String, String> metaData = new HashMap<String, String> ();
+ return getConfiguration(bucket, null);
+ }
+
+ protected TieredStorageConfiguration getConfiguration(String bucket, Map<String, String> additionalConfig) {
+ Map<String, String> metaData = new HashMap<String, String>();
+ if (additionalConfig != null) {
+ metaData.putAll(additionalConfig);
+ }
metaData.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, provider.getDriver());
metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_REGION), "");
metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_BUCKET), bucket);
metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_ENDPOINT), "");
-
+
TieredStorageConfiguration config = TieredStorageConfiguration.create(metaData);
config.setProviderCredentials(getBlobStoreCredentials());
-
+
return config;
}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
new file mode 100644
index 0000000..32ad498
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
+import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
+import org.jclouds.blobstore.BlobStore;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManagedLedgerOffloaderBase {
+
+ private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderStreamingTest.class);
+ private TieredStorageConfiguration mockedConfig;
+ private static final Random random = new Random();
+
+ BlobStoreManagedLedgerOffloaderStreamingTest() throws Exception {
+ super();
+ config = getConfiguration(BUCKET);
+ JCloudBlobStoreProvider provider = getBlobStoreProvider();
+ assertNotNull(provider);
+ provider.validate(config);
+ blobStore = provider.getBlobStore(config);
+ }
+
+ private BlobStoreManagedLedgerOffloader getOffloader(Map<String, String> additionalConfig) throws IOException {
+ return getOffloader(BUCKET, additionalConfig);
+ }
+
+ private BlobStoreManagedLedgerOffloader getOffloader(BlobStore mockedBlobStore,
+ Map<String, String> additionalConfig) throws IOException {
+ return getOffloader(BUCKET, mockedBlobStore, additionalConfig);
+ }
+
+ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, Map<String, String> additionalConfig) throws
+ IOException {
+ mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
+ Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
+ BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
+ .create(mockedConfig, new HashMap<String, String>(), scheduler);
+ return offloader;
+ }
+
+ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mockedBlobStore,
+ Map<String, String> additionalConfig) throws IOException {
+ mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
+ Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
+ BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
+ .create(mockedConfig, new HashMap<String, String>(), scheduler);
+ return offloader;
+ }
+
+ @Test
+ public void testHappyCase() throws Exception {
+ LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+ put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ }});
+ ManagedLedger ml = createMockManagedLedger();
+ UUID uuid = UUID.randomUUID();
+ long beginLedger = 0;
+ long beginEntry = 0;
+ log.error("try begin offload");
+ OffloadHandle offloadHandle = offloader
+ .streamingOffload(ml, uuid, beginLedger, beginEntry, new HashMap<>()).get();
+ //Segment should closed because size in bytes full
+ for (int i = 0; i < 10; i++) {
+ final byte[] data = new byte[100];
+ random.nextBytes(data);
+ final OffloadHandle.OfferEntryResult offerEntryResult = offloadHandle
+ .offerEntry(EntryImpl.create(0, i, data));
+ log.info("offer result: {}", offerEntryResult);
+ }
+ final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+ log.info("Offload reasult: {}", offloadResult);
+ }
+
+ @Test
+ public void testReadAndWrite() throws Exception {
+ LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+ put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ }});
+ ManagedLedger ml = createMockManagedLedger();
+ UUID uuid = UUID.randomUUID();
+ long beginLedger = 0;
+ long beginEntry = 0;
+
+ Map<String, String> driverMeta = new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+ }};
+ OffloadHandle offloadHandle = offloader
+ .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+ //Segment should closed because size in bytes full
+ final LinkedList<Entry> entries = new LinkedList<>();
+ for (int i = 0; i < 10; i++) {
+ final byte[] data = new byte[100];
+ random.nextBytes(data);
+ final EntryImpl entry = EntryImpl.create(0, i, data);
+ offloadHandle.offerEntry(entry);
+ entries.add(entry);
+ }
+
+ final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+ assertEquals(offloadResult.endLedger, 0);
+ assertEquals(offloadResult.endEntry, 9);
+ final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+ contextBuilder.addOffloadSegment(
+ MLDataFormats.OffloadSegment.newBuilder()
+ .setUidLsb(uuid.getLeastSignificantBits())
+ .setUidMsb(uuid.getMostSignificantBits())
+ .setComplete(true).setEndEntryId(9).build());
+
+ final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+ final LedgerEntries ledgerEntries = readHandle.readAsync(0, 9).get();
+
+ for (LedgerEntry ledgerEntry : ledgerEntries) {
+ final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId());
+ final byte[] storedData = storedEntry.getData();
+ final byte[] entryBytes = ledgerEntry.getEntryBytes();
+ assertEquals(storedData, entryBytes);
+ }
+ }
+
+ @Test
+ public void testReadAndWriteAcrossLedger() throws Exception {
+ LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "2000");
+ put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ }});
+ ManagedLedger ml = createMockManagedLedger();
+ UUID uuid = UUID.randomUUID();
+ long beginLedger = 0;
+ long beginEntry = 0;
+
+ Map<String, String> driverMeta = new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+ }};
+ OffloadHandle offloadHandle = offloader
+ .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+ //Segment should closed because size in bytes full
+ final LinkedList<Entry> entries = new LinkedList<>();
+ final LinkedList<Entry> ledger2Entries = new LinkedList<>();
+ for (int i = 0; i < 10; i++) {
+ final byte[] data = new byte[100];
+ random.nextBytes(data);
+ final EntryImpl entry = EntryImpl.create(0, i, data);
+ offloadHandle.offerEntry(entry);
+ entries.add(entry);
+ }
+ for (int i = 0; i < 10; i++) {
+ final byte[] data = new byte[100];
+ random.nextBytes(data);
+ final EntryImpl entry = EntryImpl.create(1, i, data);
+ offloadHandle.offerEntry(entry);
+ ledger2Entries.add(entry);
+ }
+
+ final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+ assertEquals(offloadResult.endLedger, 1);
+ assertEquals(offloadResult.endEntry, 9);
+ final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+ contextBuilder.addOffloadSegment(
+ MLDataFormats.OffloadSegment.newBuilder()
+ .setUidLsb(uuid.getLeastSignificantBits())
+ .setUidMsb(uuid.getMostSignificantBits())
+ .setComplete(true).setEndEntryId(9).build());
+
+ final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+ final LedgerEntries ledgerEntries = readHandle.readAsync(0, 9).get();
+
+ for (LedgerEntry ledgerEntry : ledgerEntries) {
+ final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId());
+ final byte[] storedData = storedEntry.getData();
+ final byte[] entryBytes = ledgerEntry.getEntryBytes();
+ assertEquals(storedData, entryBytes);
+ }
+
+ final ReadHandle readHandle2 = offloader.readOffloaded(1, contextBuilder.build(), driverMeta).get();
+ final LedgerEntries ledgerEntries2 = readHandle2.readAsync(0, 9).get();
+
+ for (LedgerEntry ledgerEntry : ledgerEntries2) {
+ final EntryImpl storedEntry = (EntryImpl) ledger2Entries.get((int) ledgerEntry.getEntryId());
+ final byte[] storedData = storedEntry.getData();
+ final byte[] entryBytes = ledgerEntry.getEntryBytes();
+ assertEquals(storedData, entryBytes);
+ }
+ }
+
+ @Test
+ public void testReadAndWriteAcrossSegment() throws Exception {
+ LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+ put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ }});
+ LedgerOffloader offloader2 = getOffloader(new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+ put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ }});
+ ManagedLedger ml = createMockManagedLedger();
+ UUID uuid = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+ long beginLedger = 0;
+ long beginEntry = 0;
+
+ Map<String, String> driverMeta = new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+ }};
+ OffloadHandle offloadHandle = offloader
+ .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+ //Segment should closed because size in bytes full
+ final LinkedList<Entry> entries = new LinkedList<>();
+ for (int i = 0; i < 10; i++) {
+ final byte[] data = new byte[100];
+ random.nextBytes(data);
+ final EntryImpl entry = EntryImpl.create(0, i, data);
+ offloadHandle.offerEntry(entry);
+ entries.add(entry);
+ }
+
+ final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+ assertEquals(offloadResult.endLedger, 0);
+ assertEquals(offloadResult.endEntry, 9);
+
+ //Segment should closed because size in bytes full
+ OffloadHandle offloadHandle2 = offloader2
+ .streamingOffload(ml, uuid2, beginLedger, 10, driverMeta).get();
+ for (int i = 0; i < 10; i++) {
+ final byte[] data = new byte[100];
+ random.nextBytes(data);
+ final EntryImpl entry = EntryImpl.create(0, i + 10, data);
+ offloadHandle2.offerEntry(entry);
+ entries.add(entry);
+ }
+ final LedgerOffloader.OffloadResult offloadResult2 = offloadHandle2.getOffloadResultAsync().get();
+ assertEquals(offloadResult2.endLedger, 0);
+ assertEquals(offloadResult2.endEntry, 19);
+
+ final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+ contextBuilder.addOffloadSegment(
+ MLDataFormats.OffloadSegment.newBuilder()
+ .setUidLsb(uuid.getLeastSignificantBits())
+ .setUidMsb(uuid.getMostSignificantBits())
+ .setComplete(true).setEndEntryId(9).build()).addOffloadSegment(
+ MLDataFormats.OffloadSegment.newBuilder()
+ .setUidLsb(uuid2.getLeastSignificantBits())
+ .setUidMsb(uuid2.getMostSignificantBits())
+ .setComplete(true).setEndEntryId(19).build()
+ );
+
+ final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+ final LedgerEntries ledgerEntries = readHandle.readAsync(0, 19).get();
+
+ for (LedgerEntry ledgerEntry : ledgerEntries) {
+ final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId());
+ final byte[] storedData = storedEntry.getData();
+ final byte[] entryBytes = ledgerEntry.getEntryBytes();
+ assertEquals(storedData, entryBytes);
+ }
+ }
+
+ @Test
+ public void testRandomRead() throws Exception {
+ LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+ put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ }});
+ LedgerOffloader offloader2 = getOffloader(new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+ put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ }});
+ ManagedLedger ml = createMockManagedLedger();
+ UUID uuid = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+ long beginLedger = 0;
+ long beginEntry = 0;
+
+ Map<String, String> driverMeta = new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+ }};
+ OffloadHandle offloadHandle = offloader
+ .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+ //Segment should closed because size in bytes full
+ final LinkedList<Entry> entries = new LinkedList<>();
+ for (int i = 0; i < 10; i++) {
+ final byte[] data = new byte[100];
+ random.nextBytes(data);
+ final EntryImpl entry = EntryImpl.create(0, i, data);
+ offloadHandle.offerEntry(entry);
+ entries.add(entry);
+ }
+
+ final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+ assertEquals(offloadResult.endLedger, 0);
+ assertEquals(offloadResult.endEntry, 9);
+
+ //Segment should closed because size in bytes full
+ OffloadHandle offloadHandle2 = offloader2
+ .streamingOffload(ml, uuid2, beginLedger, 10, driverMeta).get();
+ for (int i = 0; i < 10; i++) {
+ final byte[] data = new byte[100];
+ random.nextBytes(data);
+ final EntryImpl entry = EntryImpl.create(0, i + 10, data);
+ offloadHandle2.offerEntry(entry);
+ entries.add(entry);
+ }
+ final LedgerOffloader.OffloadResult offloadResult2 = offloadHandle2.getOffloadResultAsync().get();
+ assertEquals(offloadResult2.endLedger, 0);
+ assertEquals(offloadResult2.endEntry, 19);
+
+ final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+ contextBuilder.addOffloadSegment(
+ MLDataFormats.OffloadSegment.newBuilder()
+ .setUidLsb(uuid.getLeastSignificantBits())
+ .setUidMsb(uuid.getMostSignificantBits())
+ .setComplete(true).setEndEntryId(9).build()).addOffloadSegment(
+ MLDataFormats.OffloadSegment.newBuilder()
+ .setUidLsb(uuid2.getLeastSignificantBits())
+ .setUidMsb(uuid2.getMostSignificantBits())
+ .setComplete(true).setEndEntryId(19).build()
+ );
+
+ final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+
+ for (int i = 0; i <= 19; i++) {
+ Random seed = new Random(0);
+ int begin = seed.nextInt(20);
+ int end = seed.nextInt(20);
+ if (begin >= end) {
+ int temp = begin;
+ begin = end;
+ end = temp;
+ }
+ final LedgerEntries ledgerEntries = readHandle.readAsync(begin, end).get();
+ for (LedgerEntry ledgerEntry : ledgerEntries) {
+ final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId());
+ final byte[] storedData = storedEntry.getData();
+ final byte[] entryBytes = ledgerEntry.getEntryBytes();
+ assertEquals(storedData, entryBytes);
+ }
+ }
+ }
+
+ @Test
+ public void testInvalidEntryIds() throws Exception {
+ LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+ put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ }});
+ ManagedLedger ml = createMockManagedLedger();
+ UUID uuid = UUID.randomUUID();
+ long beginLedger = 0;
+ long beginEntry = 0;
+
+ Map<String, String> driverMeta = new HashMap<String, String>() {{
+ put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+ }};
+ OffloadHandle offloadHandle = offloader
+ .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+ //Segment should closed because size in bytes full
+ final LinkedList<Entry> entries = new LinkedList<>();
+ for (int i = 0; i < 10; i++) {
+ final byte[] data = new byte[100];
+ random.nextBytes(data);
+ final EntryImpl entry = EntryImpl.create(0, i, data);
+ offloadHandle.offerEntry(entry);
+ entries.add(entry);
+ }
+
+ final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+ assertEquals(offloadResult.endLedger, 0);
+ assertEquals(offloadResult.endEntry, 9);
+ final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+ contextBuilder.addOffloadSegment(
+ MLDataFormats.OffloadSegment.newBuilder()
+ .setUidLsb(uuid.getLeastSignificantBits())
+ .setUidMsb(uuid.getMostSignificantBits())
+ .setComplete(true).setEndEntryId(9).build());
+
+ final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+ try {
+ readHandle.read(-1, -1);
+ Assert.fail("Shouldn't be able to read anything");
+ } catch (Exception e) {
+ }
+
+ try {
+ readHandle.read(0, 20);
+ Assert.fail("Shouldn't be able to read anything");
+ } catch (Exception e) {
+ }
+ }
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java
new file mode 100644
index 0000000..d0b433b
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.OffloadSegmentInfoImpl;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.testng.Assert;
+
+public class BufferedOffloadStreamTest {
+ final Random random = new Random();
+
+ public void testWithPadding(int paddingLen) throws Exception {
+ int blockSize = StreamingDataBlockHeaderImpl.getDataStartOffset();
+ List<Entry> entryBuffer = new LinkedList<>();
+ final UUID uuid = UUID.randomUUID();
+ OffloadSegmentInfoImpl segmentInfo = new OffloadSegmentInfoImpl(uuid, 0, 0, "",
+ new HashMap<>());
+ final int entryCount = 10;
+ List<Entry> entries = new ArrayList<>();
+ for (int i = 0; i < entryCount; i++) {
+ final byte[] bytes = new byte[random.nextInt(10)];
+ final EntryImpl entry = EntryImpl.create(0, i, bytes);
+ entries.add(entry);
+ entry.retain();
+ entryBuffer.add(entry);
+ blockSize += BufferedOffloadStream.ENTRY_HEADER_SIZE + entry.getLength();
+ }
+ segmentInfo.closeSegment(0, 9);
+ blockSize += paddingLen;
+
+ final BufferedOffloadStream inputStream = new BufferedOffloadStream(blockSize, entryBuffer,
+ segmentInfo.beginLedgerId,
+ segmentInfo.beginEntryId);
+ assertEquals(inputStream.getLedgerId(), 0);
+ assertEquals(inputStream.getBeginEntryId(), 0);
+ assertEquals(inputStream.getBlockSize(), blockSize);
+
+ byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+ ByteStreams.readFully(inputStream, headerB);
+ StreamingDataBlockHeaderImpl headerRead = StreamingDataBlockHeaderImpl
+ .fromStream(new ByteArrayInputStream(headerB));
+ Assert.assertEquals(headerRead.getBlockLength(), blockSize);
+ Assert.assertEquals(headerRead.getFirstEntryId(), 0);
+
+ int left = blockSize - DataBlockHeaderImpl.getDataStartOffset();
+ for (int i = 0; i < entryCount; i++) {
+ byte lengthBuf[] = new byte[4];
+ byte entryIdBuf[] = new byte[8];
+ byte content[] = new byte[entries.get(i).getLength()];
+
+ left -= lengthBuf.length + entryIdBuf.length + content.length;
+ inputStream.read(lengthBuf);
+ inputStream.read(entryIdBuf);
+ inputStream.read(content);
+ Assert.assertEquals(entries.get(i).getLength(), Ints.fromByteArray(lengthBuf));
+ Assert.assertEquals(i, Longs.fromByteArray(entryIdBuf));
+ assertArrayEquals(entries.get(i).getData(), content);
+ }
+ assertEquals(left, paddingLen);
+ byte padding[] = new byte[left];
+ inputStream.read(padding);
+
+ ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
+ for (int i = 0; i < paddingBuf.capacity() / 4; i++) {
+ Assert.assertEquals(Integer.toHexString(paddingBuf.readInt()),
+ Integer.toHexString(0xFEDCDEAD));
+ }
+
+ // 4. reach end.
+ Assert.assertEquals(inputStream.read(), -1);
+ Assert.assertEquals(inputStream.read(), -1);
+ inputStream.close();
+
+ }
+
+ @Test
+ public void testHavePadding() throws Exception {
+ testWithPadding(10);
+ }
+
+ @Test
+ public void testNoPadding() throws Exception {
+ testWithPadding(0);
+ }
+
+ @Ignore("Disable because let offloader to ensure there is no another ledger id")
+ public void shouldEndWhenSegmentChanged() throws IOException {
+ int blockSize = StreamingDataBlockHeaderImpl.getDataStartOffset();
+ int paddingLen = 10;
+ List<Entry> entryBuffer = new LinkedList<>();
+ final UUID uuid = UUID.randomUUID();
+ OffloadSegmentInfoImpl segmentInfo = new OffloadSegmentInfoImpl(uuid, 0, 0, "",
+ new HashMap<>());
+ AtomicLong bufferLength = new AtomicLong();
+ final int entryCount = 10;
+ List<Entry> entries = new ArrayList<>();
+ for (int i = 0; i < entryCount; i++) {
+ final byte[] bytes = new byte[random.nextInt(10)];
+ final EntryImpl entry = EntryImpl.create(0, i, bytes);
+ entries.add(entry);
+ entry.retain();
+ entryBuffer.add(entry);
+ blockSize += BufferedOffloadStream.ENTRY_HEADER_SIZE + entry.getLength();
+ }
+ //create new ledger
+ {
+ final byte[] bytes = new byte[random.nextInt(10)];
+ final EntryImpl entry = EntryImpl.create(1, 0, bytes);
+ entries.add(entry);
+ entry.retain();
+ entryBuffer.add(entry);
+ }
+ segmentInfo.closeSegment(1, 0);
+ blockSize += paddingLen;
+
+ final BufferedOffloadStream inputStream = new BufferedOffloadStream(blockSize, entryBuffer,
+ segmentInfo.beginLedgerId,
+ segmentInfo.beginEntryId);
+ assertEquals(inputStream.getLedgerId(), 0);
+ assertEquals(inputStream.getBeginEntryId(), 0);
+ assertEquals(inputStream.getBlockSize(), blockSize);
+
+ byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+ ByteStreams.readFully(inputStream, headerB);
+ StreamingDataBlockHeaderImpl headerRead = StreamingDataBlockHeaderImpl
+ .fromStream(new ByteArrayInputStream(headerB));
+ Assert.assertEquals(headerRead.getBlockLength(), blockSize);
+ Assert.assertEquals(headerRead.getFirstEntryId(), 0);
+
+ int left = blockSize - DataBlockHeaderImpl.getDataStartOffset();
+ for (int i = 0; i < entryCount; i++) {
+ byte lengthBuf[] = new byte[4];
+ byte entryIdBuf[] = new byte[8];
+ byte content[] = new byte[entries.get(i).getLength()];
+
+ left -= lengthBuf.length + entryIdBuf.length + content.length;
+ inputStream.read(lengthBuf);
+ inputStream.read(entryIdBuf);
+ inputStream.read(content);
+ Assert.assertEquals(entries.get(i).getLength(), Ints.fromByteArray(lengthBuf));
+ Assert.assertEquals(i, Longs.fromByteArray(entryIdBuf));
+ assertArrayEquals(entries.get(i).getData(), content);
+ }
+ assertEquals(left, paddingLen);
+ byte padding[] = new byte[left];
+ inputStream.read(padding);
+
+ ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
+ for (int i = 0; i < paddingBuf.capacity() / 4; i++) {
+ Assert.assertEquals(Integer.toHexString(paddingBuf.readInt()),
+ Integer.toHexString(0xFEDCDEAD));
+ }
+
+ // 4. reach end.
+ Assert.assertEquals(inputStream.read(), -1);
+ Assert.assertEquals(inputStream.read(), -1);
+ inputStream.close();
+
+ assertEquals(entryBuffer.size(), 1);
+ }
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderV2Test.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderV2Test.java
new file mode 100644
index 0000000..c881cea
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderV2Test.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DataBlockHeaderV2Test {
+
+ @Test
+ public void dataBlockHeaderImplTest() throws Exception {
+ int blockLength = 1024 * 1024;
+ long firstEntryId = 3333L;
+ long ledgerId = 3L;
+
+ StreamingDataBlockHeaderImpl dataBlockHeader = StreamingDataBlockHeaderImpl.of(blockLength,
+ ledgerId, firstEntryId);
+
+ // verify get methods
+ assertEquals(StreamingDataBlockHeaderImpl.getBlockMagicWord(), StreamingDataBlockHeaderImpl.MAGIC_WORD);
+ assertEquals(dataBlockHeader.getBlockLength(), blockLength);
+ assertEquals(dataBlockHeader.getFirstEntryId(), firstEntryId);
+ assertEquals(dataBlockHeader.getLedgerId(), ledgerId);
+
+ // verify toStream and fromStream
+ InputStream stream = dataBlockHeader.toStream();
+ stream.mark(0);
+ StreamingDataBlockHeaderImpl rebuild = StreamingDataBlockHeaderImpl.fromStream(stream);
+ assertEquals(rebuild.getBlockLength(), blockLength);
+ assertEquals(rebuild.getFirstEntryId(), firstEntryId);
+ assertEquals(rebuild.getLedgerId(), ledgerId);
+ // verify InputStream reach end
+ assertEquals(stream.read(), -1);
+
+ stream.reset();
+ byte streamContent[] = new byte[StreamingDataBlockHeaderImpl.getDataStartOffset()];
+
+ // stream with all 0, simulate junk data, should throw exception for header magic not match.
+ try (InputStream stream2 = new ByteArrayInputStream(streamContent, 0,
+ StreamingDataBlockHeaderImpl.getDataStartOffset())) {
+ DataBlockHeader rebuild2 = StreamingDataBlockHeaderImpl.fromStream(stream2);
+ fail("Should throw IOException");
+ } catch (Exception e) {
+ assertTrue(e instanceof IOException);
+ assertTrue(e.getMessage().contains("Data block header magic word not match"));
+ }
+
+ // simulate read header too small, throw EOFException.
+ stream.read(streamContent);
+ try (InputStream stream3 =
+ new ByteArrayInputStream(streamContent, 0,
+ StreamingDataBlockHeaderImpl.getDataStartOffset() - 1)) {
+ DataBlockHeader rebuild3 = StreamingDataBlockHeaderImpl.fromStream(stream3);
+ fail("Should throw EOFException");
+ } catch (EOFException e) {
+ // expected
+ }
+
+ stream.close();
+ }
+
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
new file mode 100644
index 0000000..fc43376
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Predicate;
+import io.netty.buffer.ByteBuf;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+
+@Slf4j
+public class MockManagedLedger implements ManagedLedger {
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public void asyncAddEntry(byte[] data, AsyncCallbacks.AddEntryCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException,
+ ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public void asyncAddEntry(byte[] data, int offset, int length, AsyncCallbacks.AddEntryCallback callback,
+ Object ctx) {
+
+ }
+
+ @Override
+ public void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length,
+ AsyncCallbacks.AddEntryCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public void asyncAddEntry(ByteBuf buffer, AsyncCallbacks.AddEntryCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AsyncCallbacks.AddEntryCallback callback,
+ Object ctx) {
+
+ }
+
+ @Override
+ public ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition) throws
+ InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition,
+ Map<String, Long> properties) throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws
+ ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName,
+ CommandSubscribe.InitialPosition initialPosition) throws
+ ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public void asyncDeleteCursor(String name, AsyncCallbacks.DeleteCursorCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public void deleteCursor(String name) throws InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition,
+ AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition,
+ Map<String, Long> properties, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public Iterable<ManagedCursor> getCursors() {
+ return null;
+ }
+
+ @Override
+ public Iterable<ManagedCursor> getActiveCursors() {
+ return null;
+ }
+
+ @Override
+ public long getNumberOfEntries() {
+ return 0;
+ }
+
+ @Override
+ public long getNumberOfActiveEntries() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalSize() {
+ return 0;
+ }
+
+ @Override
+ public long getEstimatedBacklogSize() {
+ return 0;
+ }
+
+ @Override
+ public long getOffloadedSize() {
+ return 0;
+ }
+
+ @Override
+ public void asyncTerminate(AsyncCallbacks.TerminateCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public Position terminate() throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public void close() throws InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public ManagedLedgerMXBean getStats() {
+ return null;
+ }
+
+ @Override
+ public void delete() throws InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncDelete(AsyncCallbacks.DeleteLedgerCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public Position offloadPrefix(Position pos) throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public void asyncOffloadPrefix(Position pos, AsyncCallbacks.OffloadCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public ManagedCursor getSlowestConsumer() {
+ return null;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public ManagedLedgerConfig getConfig() {
+ return null;
+ }
+
+ @Override
+ public void setConfig(ManagedLedgerConfig config) {
+
+ }
+
+ @Override
+ public Position getLastConfirmedEntry() {
+ return null;
+ }
+
+ @Override
+ public void readyToCreateNewLedger() {
+
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return null;
+ }
+
+ @Override
+ public void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncSetProperty(String key, String value, AsyncCallbacks.UpdatePropertiesCallback callback,
+ Object ctx) {
+
+ }
+
+ @Override
+ public void deleteProperty(String key) throws InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncDeleteProperty(String key, AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncSetProperties(Map<String, String> properties,
+ AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
+
+ }
+
+ @Override
+ public void rollCurrentLedgerIfFull() {
+
+ }
+
+ @Override
+ public CompletableFuture<Position> asyncFindPosition(Predicate<Entry> predicate) {
+ return null;
+ }
+
+ @Override
+ public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
+ final LedgerInfo build = LedgerInfo.newBuilder().setLedgerId(ledgerId).setSize(100).setEntries(20).build();
+ return CompletableFuture.completedFuture(build);
+ }
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
index 4d6f3e7..a5dd8a3 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Charsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
@@ -39,6 +38,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.testng.annotations.Test;
@@ -82,7 +82,7 @@ public class OffloadIndexTest {
// }
// }
- private LedgerMetadata createLedgerMetadata(long id) throws Exception {
+ public static LedgerMetadata createLedgerMetadata(long id) throws Exception {
Map<String, byte[]> metadataCustom = Maps.newHashMap();
metadataCustom.put("key1", "value1".getBytes(UTF_8));
@@ -92,7 +92,7 @@ public class OffloadIndexTest {
bookies.add(0, new BookieSocketAddress("127.0.0.1:3181").toBookieId());
bookies.add(1, new BookieSocketAddress("127.0.0.2:3181").toBookieId());
bookies.add(2, new BookieSocketAddress("127.0.0.3:3181").toBookieId());
-
+
return LedgerMetadataBuilder.create().withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.withDigestType(DigestType.CRC32C).withPassword("password".getBytes(UTF_8))
.withCustomMetadata(metadataCustom).withClosedState().withLastEntryId(5000).withLength(100)
@@ -100,6 +100,15 @@ public class OffloadIndexTest {
}
+ public static LedgerInfo createLedgerInfo(long id) throws Exception {
+
+ Map<String, byte[]> metadataCustom = Maps.newHashMap();
+ metadataCustom.put("key1", "value1".getBytes(UTF_8));
+ metadataCustom.put("key7", "value7".getBytes(UTF_8));
+
+ return LedgerInfo.newBuilder().setLedgerId(id).setEntries(5001).setSize(10000).build();
+ }
+
// prepare metadata, then use builder to build a OffloadIndexBlockImpl
// verify get methods, readout and fromStream methods.
@Test
@@ -122,7 +131,7 @@ public class OffloadIndexTest {
// verify getIndexEntryForEntry
OffloadIndexEntry entry1 = indexBlock.getIndexEntryForEntry(0);
assertEquals(entry1.getEntryId(), 0);
- assertEquals(entry1.getPartId(),2);
+ assertEquals(entry1.getPartId(), 2);
assertEquals(entry1.getOffset(), 0);
OffloadIndexEntry entry11 = indexBlock.getIndexEntryForEntry(500);
@@ -185,7 +194,7 @@ public class OffloadIndexTest {
OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
wrapper.readLong(), dataHeaderLength);
- assertEquals(e1.getEntryId(),entry1.getEntryId());
+ assertEquals(e1.getEntryId(), entry1.getEntryId());
assertEquals(e1.getPartId(), entry1.getPartId());
assertEquals(e1.getOffset(), entry1.getOffset());
assertEquals(e1.getDataOffset(), entry1.getDataOffset());
@@ -203,7 +212,7 @@ public class OffloadIndexTest {
InputStream out2 = indexBlock.toStream();
int streamLength = out2.available();
out2.mark(0);
- OffloadIndexBlock indexBlock2 = blockBuilder.fromStream(out2);
+ OffloadIndexBlock indexBlock2 = (OffloadIndexBlock) blockBuilder.fromStream(out2);
// 1. verify metadata that got from inputstream success.
LedgerMetadata metadata2 = indexBlock2.getLedgerMetadata();
log.debug("built metadata: {}", metadata2.toString());
@@ -221,7 +230,7 @@ public class OffloadIndexTest {
byte streamContent[] = new byte[streamLength];
// stream with all 0, simulate junk data, should throw exception for header magic not match.
try(InputStream stream3 = new ByteArrayInputStream(streamContent, 0, streamLength)) {
- OffloadIndexBlock indexBlock3 = blockBuilder.fromStream(stream3);
+ OffloadIndexBlock indexBlock3 = (OffloadIndexBlock) blockBuilder.fromStream(stream3);
fail("Should throw IOException");
} catch (Exception e) {
assertTrue(e instanceof IOException);
@@ -232,7 +241,7 @@ public class OffloadIndexTest {
out2.read(streamContent);
try(InputStream stream4 =
new ByteArrayInputStream(streamContent, 0, streamLength - 1)) {
- OffloadIndexBlock indexBlock4 = blockBuilder.fromStream(stream4);
+ OffloadIndexBlock indexBlock4 = (OffloadIndexBlock) blockBuilder.fromStream(stream4);
fail("Should throw EOFException");
} catch (Exception e) {
assertTrue(e instanceof java.io.EOFException);
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java
new file mode 100644
index 0000000..dd3df41
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockV2Impl.CompatibleMetadata;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class OffloadIndexV2Test {
+
+ // prepare metadata, then use builder to build a StreamingOffloadIndexBlockImpl
+ // verify get methods, readout and fromStream methods.
+ @Test
+ public void streamingOffloadIndexBlockImplTest() throws Exception {
+ OffloadIndexBlockV2Builder blockBuilder = OffloadIndexBlockV2Builder.create();
+ final long ledgerId = 1; // use dummy ledgerId, from BK 4.12 the ledger is is required
+ LedgerInfo metadata = OffloadIndexTest.createLedgerInfo(ledgerId);
+ log.debug("created metadata: {}", metadata.toString());
+
+ blockBuilder.addLedgerMeta(ledgerId, metadata).withDataObjectLength(1).withDataBlockHeaderLength(23455);
+
+ blockBuilder.addBlock(ledgerId, 0, 2, 64 * 1024 * 1024);
+ blockBuilder.addBlock(ledgerId, 1000, 3, 64 * 1024 * 1024);
+ blockBuilder.addBlock(ledgerId, 2000, 4, 64 * 1024 * 1024);
+ OffloadIndexBlockV2 indexBlock = blockBuilder.buildV2();
+
+ // verify getEntryCount and getLedgerMetadata
+ assertEquals(indexBlock.getEntryCount(), 3);
+ assertEquals(indexBlock.getLedgerMetadata(ledgerId), new CompatibleMetadata(metadata));
+
+ // verify getIndexEntryForEntry
+ OffloadIndexEntry entry1 = indexBlock.getIndexEntryForEntry(ledgerId, 0);
+ assertEquals(entry1.getEntryId(), 0);
+ assertEquals(entry1.getPartId(), 2);
+ assertEquals(entry1.getOffset(), 0);
+
+ OffloadIndexEntry entry11 = indexBlock.getIndexEntryForEntry(ledgerId, 500);
+ assertEquals(entry11, entry1);
+
+ OffloadIndexEntry entry2 = indexBlock.getIndexEntryForEntry(ledgerId, 1000);
+ assertEquals(entry2.getEntryId(), 1000);
+ assertEquals(entry2.getPartId(), 3);
+ assertEquals(entry2.getOffset(), 64 * 1024 * 1024);
+
+ OffloadIndexEntry entry22 = indexBlock.getIndexEntryForEntry(ledgerId, 1300);
+ assertEquals(entry22, entry2);
+
+ OffloadIndexEntry entry3 = indexBlock.getIndexEntryForEntry(ledgerId, 2000);
+
+ assertEquals(entry3.getEntryId(), 2000);
+ assertEquals(entry3.getPartId(), 4);
+ assertEquals(entry3.getOffset(), 2 * 64 * 1024 * 1024);
+
+ OffloadIndexEntry entry33 = indexBlock.getIndexEntryForEntry(ledgerId, 3000);
+ assertEquals(entry33, entry3);
+
+ try {
+ OffloadIndexEntry entry4 = indexBlock.getIndexEntryForEntry(ledgerId, 6000);
+ fail("Should throw IndexOutOfBoundsException.");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ assertEquals(e.getMessage(), "Entry index: 6000 beyond lastEntryId: 5000");
+ }
+
+ // verify toStream
+ InputStream out = indexBlock.toStream();
+ byte b[] = new byte[1024];
+ int readoutLen = out.read(b);
+ out.close();
+ ByteBuf wrapper = Unpooled.wrappedBuffer(b);
+ int magic = wrapper.readInt();
+ int indexBlockLength = wrapper.readInt();
+ long dataObjectLength = wrapper.readLong();
+ long dataHeaderLength = wrapper.readLong();
+ assertEquals(ledgerId, wrapper.readLong());
+ int indexEntryCount = wrapper.readInt();
+ int segmentMetadataLength = wrapper.readInt();
+
+ // verify counter
+ assertEquals(magic, OffloadIndexBlockV2Impl.getIndexMagicWord());
+ assertEquals(indexBlockLength, readoutLen);
+ assertEquals(indexEntryCount, 3);
+ assertEquals(dataObjectLength, 1);
+ assertEquals(dataHeaderLength, 23455);
+
+ wrapper.readBytes(segmentMetadataLength);
+ log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}",
+ magic, indexBlockLength, segmentMetadataLength, indexEntryCount);
+
+ // verify entry
+ OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+ wrapper.readLong(), dataHeaderLength);
+ OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+ wrapper.readLong(), dataHeaderLength);
+ OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+ wrapper.readLong(), dataHeaderLength);
+
+ assertEquals(e1.getEntryId(), entry1.getEntryId());
+ assertEquals(e1.getPartId(), entry1.getPartId());
+ assertEquals(e1.getOffset(), entry1.getOffset());
+ assertEquals(e1.getDataOffset(), entry1.getDataOffset());
+ assertEquals(e2.getEntryId(), entry2.getEntryId());
+ assertEquals(e2.getPartId(), entry2.getPartId());
+ assertEquals(e2.getOffset(), entry2.getOffset());
+ assertEquals(e2.getDataOffset(), entry2.getDataOffset());
+ assertEquals(e3.getEntryId(), entry3.getEntryId());
+ assertEquals(e3.getPartId(), entry3.getPartId());
+ assertEquals(e3.getOffset(), entry3.getOffset());
+ assertEquals(e3.getDataOffset(), entry3.getDataOffset());
+ wrapper.release();
+
+ // verify build StreamingOffloadIndexBlock from InputStream
+ InputStream out2 = indexBlock.toStream();
+ int streamLength = out2.available();
+ out2.mark(0);
+ OffloadIndexBlockV2 indexBlock2 = blockBuilder.fromStream(out2);
+ // 1. verify metadata that got from inputstream success.
+ LedgerMetadata metadata2 = indexBlock2.getLedgerMetadata(ledgerId);
+ log.debug("built metadata: {}", metadata2.toString());
+ assertEquals(metadata.getLedgerId(), metadata2.getLedgerId());
+ assertEquals(metadata.getEntries() - 1, metadata2.getLastEntryId());
+ assertEquals(metadata.getSize(), metadata2.getLength());
+ // 2. verify set all the entries
+ assertEquals(indexBlock2.getEntryCount(), indexBlock.getEntryCount());
+ // 3. verify reach end
+ assertEquals(out2.read(), -1);
+
+
+ out2.reset();
+ byte streamContent[] = new byte[streamLength];
+ // stream with all 0, simulate junk data, should throw exception for header magic not match.
+ try (InputStream stream3 = new ByteArrayInputStream(streamContent, 0, streamLength)) {
+ OffloadIndexBlockV2 indexBlock3 = blockBuilder.fromStream(stream3);
+ fail("Should throw IOException");
+ } catch (Exception e) {
+ assertTrue(e instanceof IOException);
+ assertTrue(e.getMessage().contains("Invalid MagicWord"));
+ }
+
+ // simulate read header too small, throw EOFException.
+ out2.read(streamContent);
+ try (InputStream stream4 =
+ new ByteArrayInputStream(streamContent, 0, streamLength - 1)) {
+ OffloadIndexBlockV2 indexBlock4 = blockBuilder.fromStream(stream4);
+ fail("Should throw EOFException");
+ } catch (Exception e) {
+ assertTrue(e instanceof java.io.EOFException);
+ }
+
+ out2.close();
+ indexBlock.close();
+ }
+
+ @Test
+ public void streamingMultiLedgerOffloadIndexBlockImplTest() throws Exception {
+ OffloadIndexBlockV2Builder blockBuilder = OffloadIndexBlockV2Builder.create();
+ final long ledgerId1 = 1; // use dummy ledgerId, from BK 4.12 the ledger is is required
+ final long ledgerId2 = 2;
+ LedgerInfo metadata1 = OffloadIndexTest.createLedgerInfo(ledgerId1);
+ LedgerInfo metadata2 = OffloadIndexTest.createLedgerInfo(ledgerId2);
+ log.debug("created metadata: {}", metadata1.toString());
+ log.debug("created metadata: {}", metadata2.toString());
+
+ blockBuilder.addLedgerMeta(ledgerId1, metadata1)
+ .addLedgerMeta(ledgerId2, metadata2)
+ .withDataObjectLength(1)
+ .withDataBlockHeaderLength(23455);
+
+ blockBuilder.addBlock(ledgerId1, 1000, 2, 64 * 1024 * 1024);
+ blockBuilder.addBlock(ledgerId2, 0, 3, 64 * 1024 * 1024);
+ blockBuilder.addBlock(ledgerId2, 1000, 4, 64 * 1024 * 1024);
+ OffloadIndexBlockV2 indexBlock = blockBuilder.buildV2();
+
+ // verify getEntryCount and getLedgerMetadata
+ assertEquals(indexBlock.getEntryCount(), 3);
+ assertEquals(indexBlock.getLedgerMetadata(ledgerId1),
+ new CompatibleMetadata(metadata1));
+ assertEquals(indexBlock.getLedgerMetadata(ledgerId2), new CompatibleMetadata(metadata2));
+
+ // verify getIndexEntryForEntry
+ OffloadIndexEntry entry1 = indexBlock.getIndexEntryForEntry(ledgerId1, 1000);
+ assertEquals(entry1.getEntryId(), 1000);
+ assertEquals(entry1.getPartId(), 2);
+ assertEquals(entry1.getOffset(), 0);
+
+ OffloadIndexEntry entry11 = indexBlock.getIndexEntryForEntry(ledgerId1, 1500);
+ assertEquals(entry11, entry1);
+
+ OffloadIndexEntry entry2 = indexBlock.getIndexEntryForEntry(ledgerId2, 0);
+ assertEquals(entry2.getEntryId(), 0);
+ assertEquals(entry2.getPartId(), 3);
+ assertEquals(entry2.getOffset(), 64 * 1024 * 1024);
+
+ OffloadIndexEntry entry22 = indexBlock.getIndexEntryForEntry(ledgerId2, 300);
+ assertEquals(entry22, entry2);
+
+ OffloadIndexEntry entry3 = indexBlock.getIndexEntryForEntry(ledgerId2, 1000);
+
+ assertEquals(entry3.getEntryId(), 1000);
+ assertEquals(entry3.getPartId(), 4);
+ assertEquals(entry3.getOffset(), 2 * 64 * 1024 * 1024);
+
+ OffloadIndexEntry entry33 = indexBlock.getIndexEntryForEntry(ledgerId2, 2000);
+ assertEquals(entry33, entry3);
+
+ try {
+ OffloadIndexEntry entry4 = indexBlock.getIndexEntryForEntry(ledgerId2, 6000);
+ fail("Should throw IndexOutOfBoundsException.");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ assertEquals(e.getMessage(), "Entry index: 6000 beyond lastEntryId: 5000");
+ }
+
+ // verify toStream
+ InputStream out = indexBlock.toStream();
+ byte b[] = new byte[1024];
+ int readoutLen = out.read(b);
+ out.close();
+ ByteBuf wrapper = Unpooled.wrappedBuffer(b);
+ int magic = wrapper.readInt();
+ int indexBlockLength = wrapper.readInt();
+ long dataObjectLength = wrapper.readLong();
+ long dataHeaderLength = wrapper.readLong();
+ assertEquals(ledgerId1, wrapper.readLong());
+ int indexEntryCount = wrapper.readInt();
+ int segmentMetadataLength = wrapper.readInt();
+
+ // verify counter
+ assertEquals(magic, OffloadIndexBlockV2Impl.getIndexMagicWord());
+ assertEquals(indexBlockLength, readoutLen);
+ assertEquals(indexEntryCount, 1);
+ assertEquals(dataObjectLength, 1);
+ assertEquals(dataHeaderLength, 23455);
+
+ wrapper.readBytes(segmentMetadataLength);
+ log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}",
+ magic, indexBlockLength, segmentMetadataLength, indexEntryCount);
+
+ // verify entry
+ OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+ wrapper.readLong(), dataHeaderLength);
+
+ assertEquals(e1.getEntryId(), entry1.getEntryId());
+ assertEquals(e1.getPartId(), entry1.getPartId());
+ assertEquals(e1.getOffset(), entry1.getOffset());
+ assertEquals(e1.getDataOffset(), entry1.getDataOffset());
+
+
+ assertEquals(ledgerId2, wrapper.readLong());
+ int indexEntryCount2 = wrapper.readInt();
+ assertEquals(indexEntryCount2, 2);
+ int segmentMetadataLength2 = wrapper.readInt();
+ wrapper.readBytes(segmentMetadataLength2);
+
+ OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+ wrapper.readLong(), dataHeaderLength);
+ OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+ wrapper.readLong(), dataHeaderLength);
+
+ assertEquals(e2.getEntryId(), entry2.getEntryId());
+ assertEquals(e2.getPartId(), entry2.getPartId());
+ assertEquals(e2.getOffset(), entry2.getOffset());
+ assertEquals(e2.getDataOffset(), entry2.getDataOffset());
+ assertEquals(e3.getEntryId(), entry3.getEntryId());
+ assertEquals(e3.getPartId(), entry3.getPartId());
+ assertEquals(e3.getOffset(), entry3.getOffset());
+ assertEquals(e3.getDataOffset(), entry3.getDataOffset());
+ wrapper.release();
+
+ // verify build StreamingOffloadIndexBlock from InputStream
+ InputStream out2 = indexBlock.toStream();
+ int streamLength = out2.available();
+ out2.mark(0);
+ OffloadIndexBlockV2 indexBlock2 = blockBuilder.fromStream(out2);
+ // 1. verify metadata that got from inputstream success.
+ //TODO change to meaningful things
+// LedgerMetadata metadata1back = indexBlock2.getLedgerMetadata(ledgerId1);
+// log.debug("built metadata: {}", metadata1back.toString());
+// assertEquals(metadata1back.getAckQuorumSize(), metadata1.getAckQuorumSize());
+// assertEquals(metadata1back.getEnsembleSize(), metadata1.getEnsembleSize());
+// assertEquals(metadata1back.getDigestType(), metadata1.getDigestType());
+// assertEquals(metadata1back.getAllEnsembles().entrySet(), metadata1.getAllEnsembles().entrySet());
+// LedgerMetadata metadata2back = indexBlock2.getLedgerMetadata(ledgerId2);
+// log.debug("built metadata: {}", metadata2back.toString());
+// assertEquals(metadata2back.getAckQuorumSize(), metadata1.getAckQuorumSize());
+// assertEquals(metadata2back.getEnsembleSize(), metadata1.getEnsembleSize());
+// assertEquals(metadata2back.getDigestType(), metadata1.getDigestType());
+// assertEquals(metadata2back.getAllEnsembles().entrySet(), metadata1.getAllEnsembles().entrySet());
+ // 2. verify set all the entries
+ assertEquals(indexBlock2.getEntryCount(), indexBlock.getEntryCount());
+ // 3. verify reach end
+ assertEquals(out2.read(), -1);
+
+ out2.close();
+ indexBlock.close();
+ }
+}
diff --git a/tiered-storage/jcloud/src/test/resources/log4j2-test.yml b/tiered-storage/jcloud/src/test/resources/log4j2-test.yml
new file mode 100644
index 0000000..46311be
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/resources/log4j2-test.yml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+
+Configuration:
+ status: warn
+ name: YAMLConfigTest
+ properties:
+ property:
+ name: filename
+ value: target/test-yaml.log
+ thresholdFilter:
+ level: debug
+ appenders:
+ Console:
+ name: STDOUT
+ target: SYSTEM_OUT
+ PatternLayout:
+ Pattern: "%d{HH:mm:ss.SSS} [%t:%C@%L] %-5level %logger{36} - %msg%n"
+ File:
+ name: File
+ fileName: ${filename}
+ PatternLayout:
+ Pattern: "%d %p %C{1.} [%t] %m%n"
+ Filters:
+ ThresholdFilter:
+ level: error
+
+ Loggers:
+ logger:
+ - name: org.apache.bookkeeper.client.PulsarMockReadHandle
+ level: info
+ additivity: false
+ AppenderRef:
+ ref: STDOUT
+ - name: org.apache.logging.log4j.test2
+ level: debug
+ additivity: false
+ AppenderRef:
+ ref: File
+ Root:
+ level: debug
+ AppenderRef:
+ ref: STDOUT