You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/16 17:19:14 UTC
[incubator-pulsar] branch master updated: ReadHandle implementation
backed by S3 (#1790)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f0b1471 ReadHandle implementation backed by S3 (#1790)
f0b1471 is described below
commit f0b1471a438ea0b4f77c1c00e0b4c64eb57074f7
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed May 16 19:19:11 2018 +0200
ReadHandle implementation backed by S3 (#1790)
Implementation of bookkeeper ReadHandle, which reads from an S3
object.
Master Issue: #1511
---
conf/broker.conf | 3 +
.../apache/pulsar/broker/ServiceConfiguration.java | 12 +
.../pulsar/broker/s3offload/OffloadIndexEntry.java | 7 +-
.../broker/s3offload/S3ManagedLedgerOffloader.java | 37 ++-
.../s3offload/impl/OffloadIndexEntryImpl.java | 4 +
.../s3offload/impl/S3BackedReadHandleImpl.java | 207 +++++++++++++++
.../s3offload/S3ManagedLedgerOffloaderTest.java | 278 ++++++++++++---------
7 files changed, 421 insertions(+), 127 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index cf9faa8..39523cd 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -495,6 +495,9 @@ s3ManagedLedgerOffloadServiceEndpoint=
# For Amazon S3 ledger offload, Max block size in bytes.
s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
+# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
+s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
+
### --- Deprecated config variables --- ###
# Deprecated. Use configurationStoreServers
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 74363cc..d28ff4c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -487,6 +487,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
// For Amazon S3 ledger offload, Max block size in bytes.
private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024;
+ // For Amazon S3 ledger offload, Read buffer size in bytes.
+ @FieldContext(minValue = 1024)
+ private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB
+
public String getZookeeperServers() {
return zookeeperServers;
}
@@ -1694,4 +1698,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
return this.s3ManagedLedgerOffloadMaxBlockSizeInBytes;
}
+ public void setS3ManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes) {
+ this.s3ManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes;
+ }
+
+ public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() {
+ return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
+ }
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
index 03927d3..6a976ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
@@ -43,8 +43,13 @@ public interface OffloadIndexEntry {
int getPartId();
/**
- * Get the offset of this message entry in code storage.
+ * Get the offset of this block within the object.
*/
long getOffset();
+
+ /**
+ * Get the offset of the block's data within the object.
+ */
+ long getDataOffset();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 76920de..4d5b388 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
+import org.apache.pulsar.broker.s3offload.impl.S3BackedReadHandleImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
private final String bucket;
// max block size for each data block.
private int maxBlockSize;
+ private final int readBufferSize;
public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
ScheduledExecutorService scheduler)
@@ -65,6 +67,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
String bucket = conf.getS3ManagedLedgerOffloadBucket();
String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
+ int readBufferSize = conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes();
if (Strings.isNullOrEmpty(region)) {
throw new PulsarServerException("s3ManagedLedgerOffloadRegion cannot be empty is s3 offload enabled");
@@ -80,22 +83,24 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
} else {
builder.setRegion(region);
}
- return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize);
+ return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize, readBufferSize);
}
- S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler, int maxBlockSize) {
+ S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler,
+ int maxBlockSize, int readBufferSize) {
this.s3client = s3client;
this.bucket = bucket;
this.scheduler = scheduler;
this.maxBlockSize = maxBlockSize;
+ this.readBufferSize = readBufferSize;
}
- static String dataBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
- return String.format("ledger-%d-%s", readHandle.getId(), uuid.toString());
+ static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
+ return String.format("ledger-%d-%s", ledgerId, uuid.toString());
}
- static String indexBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
- return String.format("ledger-%d-%s-index", readHandle.getId(), uuid.toString());
+ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
+ return String.format("ledger-%d-%s-index", ledgerId, uuid.toString());
}
// upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block,
@@ -107,8 +112,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
scheduler.submit(() -> {
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
.withMetadata(readHandle.getLedgerMetadata());
- String dataBlockKey = dataBlockOffloadKey(readHandle, uuid);
- String indexBlockKey = indexBlockOffloadKey(readHandle, uuid);
+ String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
+ String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid);
InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey);
InitiateMultipartUploadResult dataBlockRes = null;
@@ -174,12 +179,12 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
metadata.setContentLength(indexStream.available());
s3client.putObject(new PutObjectRequest(
bucket,
- indexBlockOffloadKey(readHandle, uuid),
+ indexBlockKey,
indexStream,
metadata));
promise.complete(null);
} catch (Throwable t) {
- s3client.deleteObject(bucket, dataBlockOffloadKey(readHandle, uuid));
+ s3client.deleteObject(bucket, dataBlockKey);
promise.completeExceptionally(t);
return;
}
@@ -190,7 +195,17 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
- promise.completeExceptionally(new UnsupportedOperationException());
+ String key = dataBlockOffloadKey(ledgerId, uid);
+ String indexKey = indexBlockOffloadKey(ledgerId, uid);
+ scheduler.submit(() -> {
+ try {
+ promise.complete(S3BackedReadHandleImpl.open(scheduler, s3client,
+ bucket, key, indexKey,
+ ledgerId, readBufferSize));
+ } catch (Throwable t) {
+ promise.completeExceptionally(t);
+ }
+ });
return promise;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
index d74ba93..d8d2267 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
@@ -48,6 +48,10 @@ public class OffloadIndexEntryImpl implements OffloadIndexEntry {
public long getOffset() {
return offset;
}
+ @Override
+ public long getDataOffset() {
+ return offset + DataBlockHeaderImpl.getDataStartOffset();
+ }
public OffloadIndexEntryImpl(long entryId, int partId, long offset) {
this.entryId = entryId;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
new file mode 100644
index 0000000..037ea67
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
+import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+import org.apache.pulsar.broker.s3offload.S3BackedInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3BackedReadHandleImpl implements ReadHandle {
+ private static final Logger log = LoggerFactory.getLogger(S3BackedReadHandleImpl.class);
+
+ private final long ledgerId;
+ private final OffloadIndexBlock index;
+ private final S3BackedInputStream inputStream;
+ private final DataInputStream dataStream;
+ private final ExecutorService executor;
+
+ private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
+ S3BackedInputStream inputStream,
+ ExecutorService executor) {
+ this.ledgerId = ledgerId;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = new DataInputStream(inputStream);
+ this.executor = executor;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return index.getLedgerMetadata();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ index.close();
+ inputStream.close();
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+ CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ if (firstEntry > lastEntry
+ || firstEntry < 0
+ || lastEntry > getLastAddConfirmed()) {
+ promise.completeExceptionally(new BKException.BKIncorrectParameterException());
+ return;
+ }
+ long entriesToRead = (lastEntry - firstEntry) + 1;
+ List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+ long nextExpectedId = firstEntry;
+ try {
+ OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry);
+ inputStream.seek(entry.getDataOffset());
+
+ while (entriesToRead > 0) {
+ int length = dataStream.readInt();
+ if (length < 0) { // hit padding or new block
+ inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+ length = dataStream.readInt();
+ }
+ long entryId = dataStream.readLong();
+
+ if (entryId == nextExpectedId) {
+ ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+ entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+ int toWrite = length;
+ while (toWrite > 0) {
+ toWrite -= buf.writeBytes(dataStream, toWrite);
+ }
+ entriesToRead--;
+ nextExpectedId++;
+ } else if (entryId > lastEntry) {
+ log.info("Expected to read {}, but read {}, which is greater than last entry {}",
+ nextExpectedId, entryId, lastEntry);
+ throw new BKException.BKUnexpectedConditionException();
+ } else {
+ inputStream.skip(length);
+ }
+ }
+
+ promise.complete(LedgerEntriesImpl.create(entries));
+ } catch (Throwable t) {
+ promise.completeExceptionally(t);
+ entries.forEach(LedgerEntry::close);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+ return readAsync(firstEntry, lastEntry);
+ }
+
+ @Override
+ public CompletableFuture<Long> readLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public long getLastAddConfirmed() {
+ return getLedgerMetadata().getLastEntryId();
+ }
+
+ @Override
+ public long getLength() {
+ return getLedgerMetadata().getLength();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return getLedgerMetadata().isClosed();
+ }
+
+ @Override
+ public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+ long timeOutInMillis,
+ boolean parallel) {
+ CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
+ promise.completeExceptionally(new UnsupportedOperationException());
+ return promise;
+ }
+
+ public static ReadHandle open(ScheduledExecutorService executor,
+ AmazonS3 s3client, String bucket, String key, String indexKey,
+ long ledgerId, int readBufferSize)
+ throws AmazonClientException, IOException {
+ GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
+ try (S3Object obj = s3client.getObject(req)) {
+ OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
+ OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent());
+
+ ObjectMetadata dataMetadata = s3client.getObjectMetadata(bucket, key); // FIXME: this should be part of index
+ S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key,
+ dataMetadata.getContentLength(),
+ readBufferSize);
+ return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index 583b0e0..f9a043b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -24,20 +24,29 @@ import static org.mockito.Matchers.any;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.S3Object;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.DataInputStream;
+import java.lang.reflect.Method;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.MockBookKeeper;
-import org.apache.bookkeeper.client.MockLedgerHandle;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
@@ -46,12 +55,14 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl;
-import org.apache.pulsar.broker.s3offload.impl.OffloadIndexTest.LedgerMetadataMock;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+@Slf4j
class S3ManagedLedgerOffloaderTest extends S3TestBase {
+ private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
+ private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
final ScheduledExecutorService scheduler;
final MockBookKeeper bk;
@@ -60,86 +71,53 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
bk = new MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
}
- private ReadHandle buildReadHandle(int entryCount) throws Exception {
- MockLedgerHandle lh = (MockLedgerHandle)bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes());
+ private ReadHandle buildReadHandle() throws Exception {
+ return buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+ }
- for (int index = 0; index < entryCount; index ++) {
- lh.addEntry(("foooobarrr").getBytes()); // add entry with 10 bytes data
- }
+ private ReadHandle buildReadHandle(int maxBlockSize, int blockCount) throws Exception {
+ Assert.assertTrue(maxBlockSize > DataBlockHeaderImpl.getDataStartOffset());
- lh.close();
+ LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes());
- // mock ledgerMetadata, so the lac in metadata is not -1;
- MockLedgerHandle spy = Mockito.spy(lh);
- LedgerMetadataMock metadata = new LedgerMetadataMock(1, 1, 1,
- DigestType.CRC32C, "foobar".getBytes(), null, false);
- metadata.setLastEntryId(entryCount - 1);
- Mockito.when(spy.getLedgerMetadata()).thenReturn(metadata);
+ int i = 0;
+ int bytesWrittenCurrentBlock = DataBlockHeaderImpl.getDataStartOffset();
+ int blocksWritten = 1;
+ int entries = 0;
- return spy;
- }
+ while (blocksWritten < blockCount
+ || bytesWrittenCurrentBlock < maxBlockSize/2) {
+ byte[] entry = ("foobar"+i).getBytes();
+ int sizeInBlock = entry.length + 12 /* ENTRY_HEADER_SIZE */;
- private void verifyS3ObjectRead(S3Object object, S3Object indexObject, ReadHandle readHandle, int indexEntryCount, int entryCount, int maxBlockSize) throws Exception {
- DataInputStream dis = new DataInputStream(object.getObjectContent());
- int isLength = dis.available();
-
- // read out index block
- DataInputStream indexBlockIs = new DataInputStream(indexObject.getObjectContent());
- OffloadIndexBlock indexBlock = OffloadIndexBlockImpl.get(indexBlockIs);
-
- // 1. verify index block with passed in index entry count
- Assert.assertEquals(indexBlock.getEntryCount(), indexEntryCount);
-
- // 2. verify index block read out each indexEntry.
- int entryIdTracker = 0;
- int startPartIdTracker = 1;
- int startOffsetTracker = 0;
- long entryBytesUploaded = 0;
- int entryLength = 10;
- for (int i = 0; i < indexEntryCount; i ++) {
- // 2.1 verify each indexEntry in header block
- OffloadIndexEntry indexEntry = indexBlock.getIndexEntryForEntry(entryIdTracker);
-
- Assert.assertEquals(indexEntry.getPartId(), startPartIdTracker);
- Assert.assertEquals(indexEntry.getEntryId(), entryIdTracker);
- Assert.assertEquals(indexEntry.getOffset(), startOffsetTracker);
-
- // read out and verify each data block related to this index entry
- // 2.2 verify data block header.
- DataBlockHeader headerReadout = DataBlockHeaderImpl.fromStream(dis);
- int expectedBlockSize = BlockAwareSegmentInputStreamImpl
- .calculateBlockSize(maxBlockSize, readHandle, entryIdTracker, entryBytesUploaded);
- Assert.assertEquals(headerReadout.getBlockLength(), expectedBlockSize);
- Assert.assertEquals(headerReadout.getFirstEntryId(), entryIdTracker);
-
- // 2.3 verify data block
- int entrySize = 0;
- long entryId = 0;
- for (int bytesReadout = headerReadout.getBlockLength() - DataBlockHeaderImpl.getDataStartOffset();
- bytesReadout > 0;
- bytesReadout -= (4 + 8 + entrySize)) {
- entrySize = dis.readInt();
- entryId = dis.readLong();
- byte[] bytes = new byte[(int) entrySize];
- dis.read(bytes);
-
- Assert.assertEquals(entrySize, entryLength);
- Assert.assertEquals(entryId, entryIdTracker ++);
- entryBytesUploaded += entrySize;
+ if (bytesWrittenCurrentBlock + sizeInBlock > maxBlockSize) {
+ bytesWrittenCurrentBlock = DataBlockHeaderImpl.getDataStartOffset();
+ blocksWritten++;
+ entries = 0;
}
+ entries++;
- startPartIdTracker ++;
- startOffsetTracker += headerReadout.getBlockLength();
+ lh.addEntry(entry);
+ bytesWrittenCurrentBlock += sizeInBlock;
+ i++;
}
- return;
+ // workaround mock not closing metadata correctly
+ Method close = LedgerMetadata.class.getDeclaredMethod("close", long.class);
+ close.setAccessible(true);
+ close.invoke(lh.getLedgerMetadata(), lh.getLastAddConfirmed());
+
+ lh.close();
+
+ return bk.newOpenLedgerOp().withLedgerId(lh.getId())
+ .withPassword("foobar".getBytes()).withDigestType(DigestType.CRC32).execute().get();
}
@Test
public void testHappyCase() throws Exception {
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, 1024);
-
- offloader.offload(buildReadHandle(1), UUID.randomUUID(), new HashMap<>()).get();
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+ offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
}
@Test
@@ -152,7 +130,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, scheduler);
try {
- offloader.offload(buildReadHandle(1), UUID.randomUUID(), new HashMap<>()).get();
+ offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
Assert.fail("Shouldn't be able to add to bucket");
} catch (ExecutionException e) {
Assert.assertTrue(e.getMessage().contains("NoSuchBucket"));
@@ -188,36 +166,38 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
}
@Test
- public void testOffload() throws Exception {
- int entryLength = 10;
- int entryNumberEachBlock = 10;
- ServiceConfiguration conf = new ServiceConfiguration();
- conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+ public void testOffloadAndRead() throws Exception {
+ ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+ UUID uuid = UUID.randomUUID();
+ offloader.offload(toWrite, uuid, new HashMap<>()).get();
- conf.setS3ManagedLedgerOffloadBucket(BUCKET);
- conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
- conf.setS3ManagedLedgerOffloadServiceEndpoint(s3endpoint);
- conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(
- DataBlockHeaderImpl.getDataStartOffset() + (entryLength + 12) * entryNumberEachBlock);
- LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, scheduler);
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get();
+ Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
- // offload 30 entries, which will be placed into 3 data blocks.
- int entryCount = 30;
- ReadHandle readHandle = buildReadHandle(entryCount);
- UUID uuid = UUID.randomUUID();
- offloader.offload(readHandle, uuid, new HashMap<>()).get();
+ try (LedgerEntries toWriteEntries = toWrite.read(0, toWrite.getLastAddConfirmed());
+ LedgerEntries toTestEntries = toTest.read(0, toTest.getLastAddConfirmed())) {
+ Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator();
+ Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
- S3Object obj = s3client.getObject(BUCKET, dataBlockOffloadKey(readHandle, uuid));
- S3Object indexObj = s3client.getObject(BUCKET, S3ManagedLedgerOffloader.indexBlockOffloadKey(readHandle, uuid));
+ while (toWriteIter.hasNext() && toTestIter.hasNext()) {
+ LedgerEntry toWriteEntry = toWriteIter.next();
+ LedgerEntry toTestEntry = toTestIter.next();
- verifyS3ObjectRead(obj, indexObj, readHandle, 3, 30, conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
+ Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+ Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+ Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+ Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+ }
+ Assert.assertFalse(toWriteIter.hasNext());
+ Assert.assertFalse(toTestIter.hasNext());
+ }
}
@Test
public void testOffloadFailInitDataBlockUpload() throws Exception {
- int maxBlockSize = 1024;
- int entryCount = 3;
- ReadHandle readHandle = buildReadHandle(entryCount);
+ ReadHandle readHandle = buildReadHandle();
UUID uuid = UUID.randomUUID();
String failureString = "fail InitDataBlockUpload";
@@ -228,23 +208,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
.doThrow(new AmazonServiceException(failureString))
.when(mockS3client).initiateMultipartUpload(any());
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize);
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
offloader.offload(readHandle, uuid, new HashMap<>()).get();
Assert.fail("Should throw exception when initiateMultipartUpload");
} catch (Exception e) {
// excepted
Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid)));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid)));
+ Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid)));
}
}
@Test
public void testOffloadFailDataBlockPartUpload() throws Exception {
- int maxBlockSize = 1024;
- int entryCount = 3;
- ReadHandle readHandle = buildReadHandle(entryCount);
+ ReadHandle readHandle = buildReadHandle();
UUID uuid = UUID.randomUUID();
String failureString = "fail DataBlockPartUpload";
@@ -256,23 +235,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
.when(mockS3client).uploadPart(any());
Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize);
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
offloader.offload(readHandle, uuid, new HashMap<>()).get();
Assert.fail("Should throw exception for when uploadPart");
} catch (Exception e) {
// excepted
Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid)));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid)));
+ Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid)));
}
}
@Test
public void testOffloadFailDataBlockUploadComplete() throws Exception {
- int maxBlockSize = 1024;
- int entryCount = 3;
- ReadHandle readHandle = buildReadHandle(entryCount);
+ ReadHandle readHandle = buildReadHandle();
UUID uuid = UUID.randomUUID();
String failureString = "fail DataBlockUploadComplete";
@@ -284,23 +262,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
.when(mockS3client).completeMultipartUpload(any());
Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize);
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
offloader.offload(readHandle, uuid, new HashMap<>()).get();
Assert.fail("Should throw exception for when completeMultipartUpload");
} catch (Exception e) {
// excepted
Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid)));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid)));
+ Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid)));
}
}
@Test
public void testOffloadFailPutIndexBlock() throws Exception {
- int maxBlockSize = 1024;
- int entryCount = 3;
- ReadHandle readHandle = buildReadHandle(entryCount);
+ ReadHandle readHandle = buildReadHandle();
UUID uuid = UUID.randomUUID();
String failureString = "fail putObject";
@@ -311,15 +288,86 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
.doThrow(new AmazonServiceException(failureString))
.when(mockS3client).putObject(any());
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize);
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
offloader.offload(readHandle, uuid, new HashMap<>()).get();
Assert.fail("Should throw exception for when putObject for index block");
} catch (Exception e) {
// excepted
Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid)));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid)));
+ Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid)));
+ }
+ }
+
+ @Test
+ public void testOffloadReadRandomAccess() throws Exception {
+ ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
+ long[][] randomAccesses = new long[10][2];
+ Random r = new Random(0);
+ for (int i = 0; i < 10; i++) {
+ long first = r.nextInt((int)toWrite.getLastAddConfirmed());
+ long second = r.nextInt((int)toWrite.getLastAddConfirmed());
+ if (second < first) {
+ long tmp = first;
+ first = second;
+ second = tmp;
+ }
+ randomAccesses[i][0] = first;
+ randomAccesses[i][1] = second;
+ }
+
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+ UUID uuid = UUID.randomUUID();
+ offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get();
+ Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+
+ for (long[] access : randomAccesses) {
+ try (LedgerEntries toWriteEntries = toWrite.read(access[0], access[1]);
+ LedgerEntries toTestEntries = toTest.read(access[0], access[1])) {
+ Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator();
+ Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
+
+ while (toWriteIter.hasNext() && toTestIter.hasNext()) {
+ LedgerEntry toWriteEntry = toWriteIter.next();
+ LedgerEntry toTestEntry = toTestIter.next();
+
+ Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+ Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+ Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+ Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+ }
+ Assert.assertFalse(toWriteIter.hasNext());
+ Assert.assertFalse(toTestIter.hasNext());
+ }
+ }
+ }
+
+ @Test
+ public void testOffloadReadInvalidEntryIds() throws Exception {
+ ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+ UUID uuid = UUID.randomUUID();
+ offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get();
+ Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+
+ try {
+ toTest.read(-1, -1);
+ Assert.fail("Shouldn't be able to read anything");
+ } catch (BKException.BKIncorrectParameterException e) {
+ }
+
+ try {
+ toTest.read(0, toTest.getLastAddConfirmed() + 1);
+ Assert.fail("Shouldn't be able to read anything");
+ } catch (BKException.BKIncorrectParameterException e) {
}
}
}
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.