You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/16 17:19:14 UTC

[incubator-pulsar] branch master updated: ReadHandle implementation backed by S3 (#1790)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f0b1471  ReadHandle implementation backed by S3 (#1790)
f0b1471 is described below

commit f0b1471a438ea0b4f77c1c00e0b4c64eb57074f7
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed May 16 19:19:11 2018 +0200

    ReadHandle implementation backed by S3 (#1790)
    
    Implementation of bookkeeper ReadHandle, which reads from an S3
    object.
    
    Master Issue: #1511
---
 conf/broker.conf                                   |   3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  12 +
 .../pulsar/broker/s3offload/OffloadIndexEntry.java |   7 +-
 .../broker/s3offload/S3ManagedLedgerOffloader.java |  37 ++-
 .../s3offload/impl/OffloadIndexEntryImpl.java      |   4 +
 .../s3offload/impl/S3BackedReadHandleImpl.java     | 207 +++++++++++++++
 .../s3offload/S3ManagedLedgerOffloaderTest.java    | 278 ++++++++++++---------
 7 files changed, 421 insertions(+), 127 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index cf9faa8..39523cd 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -495,6 +495,9 @@ s3ManagedLedgerOffloadServiceEndpoint=
 # For Amazon S3 ledger offload, Max block size in bytes.
 s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
 
+# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
+s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
+
 ### --- Deprecated config variables --- ###
 
 # Deprecated. Use configurationStoreServers
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 74363cc..d28ff4c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -487,6 +487,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
     // For Amazon S3 ledger offload, Max block size in bytes.
     private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024;
 
+    // For Amazon S3 ledger offload, Read buffer size in bytes.
+    @FieldContext(minValue = 1024)
+    private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB
+
     public String getZookeeperServers() {
         return zookeeperServers;
     }
@@ -1694,4 +1698,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
         return this.s3ManagedLedgerOffloadMaxBlockSizeInBytes;
     }
 
+    public void setS3ManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes) {
+        this.s3ManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes;
+    }
+
+    public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() {
+        return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
index 03927d3..6a976ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
@@ -43,8 +43,13 @@ public interface OffloadIndexEntry {
     int getPartId();
 
     /**
-     * Get the offset of this message entry in code storage.
+     * Get the offset of this block within the object.
      */
     long getOffset();
+
+    /**
+     * Get the offset of the block's data within the object.
+     */
+    long getDataOffset();
 }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 76920de..4d5b388 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
+import org.apache.pulsar.broker.s3offload.impl.S3BackedReadHandleImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +58,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
     private final String bucket;
     // max block size for each data block.
     private int maxBlockSize;
+    private final int readBufferSize;
 
     public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
                                                   ScheduledExecutorService scheduler)
@@ -65,6 +67,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
         String bucket = conf.getS3ManagedLedgerOffloadBucket();
         String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
         int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
+        int readBufferSize = conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes();
 
         if (Strings.isNullOrEmpty(region)) {
             throw new PulsarServerException("s3ManagedLedgerOffloadRegion cannot be empty is s3 offload enabled");
@@ -80,22 +83,24 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
         } else {
             builder.setRegion(region);
         }
-        return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize);
+        return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize, readBufferSize);
     }
 
-    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler, int maxBlockSize) {
+    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler,
+                             int maxBlockSize, int readBufferSize) {
         this.s3client = s3client;
         this.bucket = bucket;
         this.scheduler = scheduler;
         this.maxBlockSize = maxBlockSize;
+        this.readBufferSize = readBufferSize;
     }
 
-    static String dataBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
-        return String.format("ledger-%d-%s", readHandle.getId(), uuid.toString());
+    static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
+        return String.format("ledger-%d-%s", ledgerId, uuid.toString());
     }
 
-    static String indexBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
-        return String.format("ledger-%d-%s-index", readHandle.getId(), uuid.toString());
+    static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
+        return String.format("ledger-%d-%s-index", ledgerId, uuid.toString());
     }
 
     // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block,
@@ -107,8 +112,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
         scheduler.submit(() -> {
             OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
                 .withMetadata(readHandle.getLedgerMetadata());
-            String dataBlockKey = dataBlockOffloadKey(readHandle, uuid);
-            String indexBlockKey = indexBlockOffloadKey(readHandle, uuid);
+            String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
+            String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid);
             InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey);
             InitiateMultipartUploadResult dataBlockRes = null;
 
@@ -174,12 +179,12 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
                 metadata.setContentLength(indexStream.available());
                 s3client.putObject(new PutObjectRequest(
                     bucket,
-                    indexBlockOffloadKey(readHandle, uuid),
+                    indexBlockKey,
                     indexStream,
                     metadata));
                 promise.complete(null);
             } catch (Throwable t) {
-                s3client.deleteObject(bucket, dataBlockOffloadKey(readHandle, uuid));
+                s3client.deleteObject(bucket, dataBlockKey);
                 promise.completeExceptionally(t);
                 return;
             }
@@ -190,7 +195,17 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
     @Override
     public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) {
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
-        promise.completeExceptionally(new UnsupportedOperationException());
+        String key = dataBlockOffloadKey(ledgerId, uid);
+        String indexKey = indexBlockOffloadKey(ledgerId, uid);
+        scheduler.submit(() -> {
+                try {
+                    promise.complete(S3BackedReadHandleImpl.open(scheduler, s3client,
+                                                                 bucket, key, indexKey,
+                                                                 ledgerId, readBufferSize));
+                } catch (Throwable t) {
+                    promise.completeExceptionally(t);
+                }
+            });
         return promise;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
index d74ba93..d8d2267 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
@@ -48,6 +48,10 @@ public class OffloadIndexEntryImpl implements OffloadIndexEntry {
     public long getOffset() {
         return offset;
     }
+    @Override
+    public long getDataOffset() {
+        return offset + DataBlockHeaderImpl.getDataStartOffset();
+    }
 
     public OffloadIndexEntryImpl(long entryId, int partId, long offset) {
         this.entryId = entryId;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
new file mode 100644
index 0000000..037ea67
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
+import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+import org.apache.pulsar.broker.s3offload.S3BackedInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3BackedReadHandleImpl implements ReadHandle {
+    private static final Logger log = LoggerFactory.getLogger(S3BackedReadHandleImpl.class);
+
+    private final long ledgerId;
+    private final OffloadIndexBlock index;
+    private final S3BackedInputStream inputStream;
+    private final DataInputStream dataStream;
+    private final ExecutorService executor;
+
+    private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
+                                   S3BackedInputStream inputStream,
+                                   ExecutorService executor) {
+        this.ledgerId = ledgerId;
+        this.index = index;
+        this.inputStream = inputStream;
+        this.dataStream = new DataInputStream(inputStream);
+        this.executor = executor;
+    }
+
+    @Override
+    public long getId() {
+        return ledgerId;
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata() {
+        return index.getLedgerMetadata();
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+                try {
+                    index.close();
+                    inputStream.close();
+                    promise.complete(null);
+                } catch (IOException t) {
+                    promise.completeExceptionally(t);
+                }
+            });
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+        log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+        CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+                if (firstEntry > lastEntry
+                    || firstEntry < 0
+                    || lastEntry > getLastAddConfirmed()) {
+                    promise.completeExceptionally(new BKException.BKIncorrectParameterException());
+                    return;
+                }
+                long entriesToRead = (lastEntry - firstEntry) + 1;
+                List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+                long nextExpectedId = firstEntry;
+                try {
+                    OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry);
+                    inputStream.seek(entry.getDataOffset());
+
+                    while (entriesToRead > 0) {
+                        int length = dataStream.readInt();
+                        if (length < 0) { // hit padding or new block
+                            inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            length = dataStream.readInt();
+                        }
+                        long entryId = dataStream.readLong();
+
+                        if (entryId == nextExpectedId) {
+                            ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+                            entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+                            int toWrite = length;
+                            while (toWrite > 0) {
+                                toWrite -= buf.writeBytes(dataStream, toWrite);
+                            }
+                            entriesToRead--;
+                            nextExpectedId++;
+                        } else if (entryId > lastEntry) {
+                            log.info("Expected to read {}, but read {}, which is greater than last entry {}",
+                                     nextExpectedId, entryId, lastEntry);
+                            throw new BKException.BKUnexpectedConditionException();
+                        } else {
+                            inputStream.skip(length);
+                        }
+                    }
+
+                    promise.complete(LedgerEntriesImpl.create(entries));
+                } catch (Throwable t) {
+                    promise.completeExceptionally(t);
+                    entries.forEach(LedgerEntry::close);
+                }
+            });
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+        return readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return getLedgerMetadata().getLastEntryId();
+    }
+
+    @Override
+    public long getLength() {
+        return getLedgerMetadata().getLength();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return getLedgerMetadata().isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                                      long timeOutInMillis,
+                                                                                      boolean parallel) {
+        CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
+        promise.completeExceptionally(new UnsupportedOperationException());
+        return promise;
+    }
+
+    public static ReadHandle open(ScheduledExecutorService executor,
+                                  AmazonS3 s3client, String bucket, String key, String indexKey,
+                                  long ledgerId, int readBufferSize)
+            throws AmazonClientException, IOException {
+        GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
+        try (S3Object obj = s3client.getObject(req)) {
+            OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
+            OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent());
+
+            ObjectMetadata dataMetadata = s3client.getObjectMetadata(bucket, key); // FIXME: this should be part of index
+            S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key,
+                                                                          dataMetadata.getContentLength(),
+                                                                          readBufferSize);
+            return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index 583b0e0..f9a043b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -24,20 +24,29 @@ import static org.mockito.Matchers.any;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.S3Object;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.DataInputStream;
+import java.lang.reflect.Method;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.MockBookKeeper;
-import org.apache.bookkeeper.client.MockLedgerHandle;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -46,12 +55,14 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
 import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
 import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl;
-import org.apache.pulsar.broker.s3offload.impl.OffloadIndexTest.LedgerMetadataMock;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+@Slf4j
 class S3ManagedLedgerOffloaderTest extends S3TestBase {
+    private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
+    private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
     final ScheduledExecutorService scheduler;
     final MockBookKeeper bk;
 
@@ -60,86 +71,53 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
         bk = new MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
     }
 
-    private ReadHandle buildReadHandle(int entryCount) throws Exception {
-        MockLedgerHandle lh = (MockLedgerHandle)bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes());
+    private ReadHandle buildReadHandle() throws Exception {
+        return buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+    }
 
-        for (int index = 0; index < entryCount; index ++) {
-            lh.addEntry(("foooobarrr").getBytes()); // add entry with 10 bytes data
-        }
+    private ReadHandle buildReadHandle(int maxBlockSize, int blockCount) throws Exception {
+        Assert.assertTrue(maxBlockSize > DataBlockHeaderImpl.getDataStartOffset());
 
-        lh.close();
+        LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes());
 
-        // mock ledgerMetadata, so the lac in metadata is not -1;
-        MockLedgerHandle spy = Mockito.spy(lh);
-        LedgerMetadataMock metadata = new LedgerMetadataMock(1, 1, 1,
-            DigestType.CRC32C, "foobar".getBytes(), null, false);
-        metadata.setLastEntryId(entryCount - 1);
-        Mockito.when(spy.getLedgerMetadata()).thenReturn(metadata);
+        int i = 0;
+        int bytesWrittenCurrentBlock = DataBlockHeaderImpl.getDataStartOffset();
+        int blocksWritten = 1;
+        int entries = 0;
 
-        return spy;
-    }
+        while (blocksWritten < blockCount
+               || bytesWrittenCurrentBlock < maxBlockSize/2) {
+            byte[] entry = ("foobar"+i).getBytes();
+            int sizeInBlock = entry.length + 12 /* ENTRY_HEADER_SIZE */;
 
-    private void verifyS3ObjectRead(S3Object object, S3Object indexObject, ReadHandle readHandle, int indexEntryCount, int entryCount, int maxBlockSize) throws Exception {
-        DataInputStream dis = new DataInputStream(object.getObjectContent());
-        int isLength = dis.available();
-
-        // read out index block
-        DataInputStream indexBlockIs = new DataInputStream(indexObject.getObjectContent());
-        OffloadIndexBlock indexBlock = OffloadIndexBlockImpl.get(indexBlockIs);
-
-        // 1. verify index block with passed in index entry count
-        Assert.assertEquals(indexBlock.getEntryCount(), indexEntryCount);
-
-        // 2. verify index block read out each indexEntry.
-        int entryIdTracker = 0;
-        int startPartIdTracker = 1;
-        int startOffsetTracker = 0;
-        long entryBytesUploaded = 0;
-        int entryLength = 10;
-        for (int i = 0; i < indexEntryCount; i ++) {
-            // 2.1 verify each indexEntry in header block
-            OffloadIndexEntry indexEntry = indexBlock.getIndexEntryForEntry(entryIdTracker);
-
-            Assert.assertEquals(indexEntry.getPartId(), startPartIdTracker);
-            Assert.assertEquals(indexEntry.getEntryId(), entryIdTracker);
-            Assert.assertEquals(indexEntry.getOffset(), startOffsetTracker);
-
-            // read out and verify each data block related to this index entry
-            // 2.2 verify data block header.
-            DataBlockHeader headerReadout = DataBlockHeaderImpl.fromStream(dis);
-            int expectedBlockSize = BlockAwareSegmentInputStreamImpl
-                .calculateBlockSize(maxBlockSize, readHandle, entryIdTracker, entryBytesUploaded);
-            Assert.assertEquals(headerReadout.getBlockLength(), expectedBlockSize);
-            Assert.assertEquals(headerReadout.getFirstEntryId(), entryIdTracker);
-
-            // 2.3 verify data block
-            int entrySize = 0;
-            long entryId = 0;
-            for (int bytesReadout = headerReadout.getBlockLength() - DataBlockHeaderImpl.getDataStartOffset();
-                bytesReadout > 0;
-                bytesReadout -= (4 + 8 + entrySize)) {
-                entrySize = dis.readInt();
-                entryId = dis.readLong();
-                byte[] bytes = new byte[(int) entrySize];
-                dis.read(bytes);
-
-                Assert.assertEquals(entrySize, entryLength);
-                Assert.assertEquals(entryId, entryIdTracker ++);
-                entryBytesUploaded += entrySize;
+            if (bytesWrittenCurrentBlock + sizeInBlock > maxBlockSize) {
+                bytesWrittenCurrentBlock = DataBlockHeaderImpl.getDataStartOffset();
+                blocksWritten++;
+                entries = 0;
             }
+            entries++;
 
-            startPartIdTracker ++;
-            startOffsetTracker += headerReadout.getBlockLength();
+            lh.addEntry(entry);
+            bytesWrittenCurrentBlock += sizeInBlock;
+            i++;
         }
 
-        return;
+        // workaround mock not closing metadata correctly
+        Method close = LedgerMetadata.class.getDeclaredMethod("close", long.class);
+        close.setAccessible(true);
+        close.invoke(lh.getLedgerMetadata(), lh.getLastAddConfirmed());
+
+        lh.close();
+
+        return bk.newOpenLedgerOp().withLedgerId(lh.getId())
+            .withPassword("foobar".getBytes()).withDigestType(DigestType.CRC32).execute().get();
     }
 
     @Test
     public void testHappyCase() throws Exception {
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, 1024);
-
-        offloader.offload(buildReadHandle(1), UUID.randomUUID(), new HashMap<>()).get();
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+                DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+        offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
     }
 
     @Test
@@ -152,7 +130,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
         LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, scheduler);
 
         try {
-            offloader.offload(buildReadHandle(1), UUID.randomUUID(), new HashMap<>()).get();
+            offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
             Assert.fail("Shouldn't be able to add to bucket");
         } catch (ExecutionException e) {
             Assert.assertTrue(e.getMessage().contains("NoSuchBucket"));
@@ -188,36 +166,38 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
     }
 
     @Test
-    public void testOffload() throws Exception {
-        int entryLength = 10;
-        int entryNumberEachBlock = 10;
-        ServiceConfiguration conf = new ServiceConfiguration();
-        conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+    public void testOffloadAndRead() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+                                                                 DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
-        conf.setS3ManagedLedgerOffloadBucket(BUCKET);
-        conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
-        conf.setS3ManagedLedgerOffloadServiceEndpoint(s3endpoint);
-        conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(
-            DataBlockHeaderImpl.getDataStartOffset() + (entryLength + 12) * entryNumberEachBlock);
-        LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, scheduler);
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
 
-        // offload 30 entries, which will be placed into 3 data blocks.
-        int entryCount = 30;
-        ReadHandle readHandle = buildReadHandle(entryCount);
-        UUID uuid = UUID.randomUUID();
-        offloader.offload(readHandle, uuid, new HashMap<>()).get();
+        try (LedgerEntries toWriteEntries = toWrite.read(0, toWrite.getLastAddConfirmed());
+             LedgerEntries toTestEntries = toTest.read(0, toTest.getLastAddConfirmed())) {
+            Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator();
+            Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
 
-        S3Object obj = s3client.getObject(BUCKET, dataBlockOffloadKey(readHandle, uuid));
-        S3Object indexObj = s3client.getObject(BUCKET, S3ManagedLedgerOffloader.indexBlockOffloadKey(readHandle, uuid));
+            while (toWriteIter.hasNext() && toTestIter.hasNext()) {
+                LedgerEntry toWriteEntry = toWriteIter.next();
+                LedgerEntry toTestEntry = toTestIter.next();
 
-        verifyS3ObjectRead(obj, indexObj, readHandle, 3, 30, conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
+                Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+                Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+                Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+                Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+            }
+            Assert.assertFalse(toWriteIter.hasNext());
+            Assert.assertFalse(toTestIter.hasNext());
+        }
     }
 
     @Test
     public void testOffloadFailInitDataBlockUpload() throws Exception {
-        int maxBlockSize = 1024;
-        int entryCount = 3;
-        ReadHandle readHandle = buildReadHandle(entryCount);
+        ReadHandle readHandle = buildReadHandle();
         UUID uuid = UUID.randomUUID();
         String failureString = "fail InitDataBlockUpload";
 
@@ -228,23 +208,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
                 .doThrow(new AmazonServiceException(failureString))
                 .when(mockS3client).initiateMultipartUpload(any());
 
-            LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize);
+            LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+                                                                     DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
             Assert.fail("Should throw exception when initiateMultipartUpload");
         } catch (Exception e) {
             // excepted
             Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
             Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid)));
+            Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
     @Test
     public void testOffloadFailDataBlockPartUpload() throws Exception {
-        int maxBlockSize = 1024;
-        int entryCount = 3;
-        ReadHandle readHandle = buildReadHandle(entryCount);
+        ReadHandle readHandle = buildReadHandle();
         UUID uuid = UUID.randomUUID();
         String failureString = "fail DataBlockPartUpload";
 
@@ -256,23 +235,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
                 .when(mockS3client).uploadPart(any());
             Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
 
-            LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize);
+            LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+                                                                     DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
             Assert.fail("Should throw exception for when uploadPart");
         } catch (Exception e) {
             // excepted
             Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
             Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid)));
+            Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
     @Test
     public void testOffloadFailDataBlockUploadComplete() throws Exception {
-        int maxBlockSize = 1024;
-        int entryCount = 3;
-        ReadHandle readHandle = buildReadHandle(entryCount);
+        ReadHandle readHandle = buildReadHandle();
         UUID uuid = UUID.randomUUID();
         String failureString = "fail DataBlockUploadComplete";
 
@@ -284,23 +262,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
                 .when(mockS3client).completeMultipartUpload(any());
             Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
 
-            LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize);
+            LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+                                                                     DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
             Assert.fail("Should throw exception for when completeMultipartUpload");
         } catch (Exception e) {
             // excepted
             Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
             Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid)));
+            Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
     @Test
     public void testOffloadFailPutIndexBlock() throws Exception {
-        int maxBlockSize = 1024;
-        int entryCount = 3;
-        ReadHandle readHandle = buildReadHandle(entryCount);
+        ReadHandle readHandle = buildReadHandle();
         UUID uuid = UUID.randomUUID();
         String failureString = "fail putObject";
 
@@ -311,15 +288,86 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
                 .doThrow(new AmazonServiceException(failureString))
                 .when(mockS3client).putObject(any());
 
-            LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize);
+            LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+                                                                     DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
             Assert.fail("Should throw exception for when putObject for index block");
         } catch (Exception e) {
             // excepted
             Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
             Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid)));
+            Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid)));
+        }
+    }
+
+    @Test
+    public void testOffloadReadRandomAccess() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
+        long[][] randomAccesses = new long[10][2];
+        Random r = new Random(0);
+        for (int i = 0; i < 10; i++) {
+            long first = r.nextInt((int)toWrite.getLastAddConfirmed());
+            long second = r.nextInt((int)toWrite.getLastAddConfirmed());
+            if (second < first) {
+                long tmp = first;
+                first = second;
+                second = tmp;
+            }
+            randomAccesses[i][0] = first;
+            randomAccesses[i][1] = second;
+        }
+
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+                                                                 DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+
+        for (long[] access : randomAccesses) {
+            try (LedgerEntries toWriteEntries = toWrite.read(access[0], access[1]);
+                 LedgerEntries toTestEntries = toTest.read(access[0], access[1])) {
+                Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator();
+                Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
+
+                while (toWriteIter.hasNext() && toTestIter.hasNext()) {
+                    LedgerEntry toWriteEntry = toWriteIter.next();
+                    LedgerEntry toTestEntry = toTestIter.next();
+
+                    Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+                    Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+                    Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+                    Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+                }
+                Assert.assertFalse(toWriteIter.hasNext());
+                Assert.assertFalse(toTestIter.hasNext());
+            }
+        }
+    }
+
+    @Test
+    public void testOffloadReadInvalidEntryIds() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+                                                                 DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+
+        try {
+            toTest.read(-1, -1);
+            Assert.fail("Shouldn't be able to read anything");
+        } catch (BKException.BKIncorrectParameterException e) {
+        }
+
+        try {
+            toTest.read(0, toTest.getLastAddConfirmed() + 1);
+            Assert.fail("Shouldn't be able to read anything");
+        } catch (BKException.BKIncorrectParameterException e) {
         }
     }
 }

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.