You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:57:05 UTC

[pulsar] 15/22: Fix the read performance issue in the offload readAsync (#12443)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5e64b51b31e9d4b50907194b75dacca8de584f07
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Oct 22 11:15:55 2021 +0800

    Fix the read performance issue in the offload readAsync (#12443)
    
    ---
    
    *Motivation*
    
    In the #12123, I add the seek operation at the readAsync method.
    It makes sure the data stream always seek to the first entry position
    to read and will not introduce EOF exception.
    But in the offload index entry, it groups a set of entries into a range,
    the seek operation will seek the posistion to the first entry in the range.
    That will introduce a performance issue because every read opeartion will
    read from the first entry in the range until it find the actual first read
    entry.
    But if we remove the seek operation, that will cause a EOF exception from
    the readAsync method. This PR adds a limitation of the seek opeartion.
    
    *Modifications*
    
    Add available method in the backedInputStream to get know how many bytes
    we can read from the stream.
    
    (cherry picked from commit b4d05ac1bf5cddb613d93806e505ef8788e1acc0)
---
 .../impl/BlobStoreBackedInputStreamImpl.java       |  5 +++++
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 26 +++++++++++++---------
 .../jcloud/BlobStoreBackedInputStreamTest.java     | 23 +++++++++++++++++++
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 19 ++++++++++++++++
 4 files changed, 63 insertions(+), 10 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index 6a204d5..e3fc68a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -141,4 +141,9 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
     public void close() {
         buffer.release();
     }
+
+    @Override
+    public int available() throws IOException {
+        return (int)(objectLen - cursor) + buffer.readableBytes();
+    }
 }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 2bf380d..98fdff4 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -105,6 +105,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.submit(() -> {
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            boolean seeked = false;
             try {
                 if (firstEntry > lastEntry
                     || firstEntry < 0
@@ -115,14 +116,13 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                 long entriesToRead = (lastEntry - firstEntry) + 1;
                 long nextExpectedId = firstEntry;
 
-                // seek the position to the first entry position, otherwise we will get the unexpected entry ID when doing
-                // the first read, that would cause read an unexpected entry id which is out of range between firstEntry
-                // and lastEntry
-                // for example, when we get 1-10 entries at first, then the next request is get 2-9, the following code
-                // will read the entry id from the stream and that is not the correct entry id, so it will seek to the
-                // correct position then read the stream as normal. But the entry id may exceed the last entry id, that
-                // will cause we are hardly to know the edge of the request range.
-                inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+                // checking the data stream has enough data to read to avoid throw EOF exception when reading data.
+                // 12 bytes represent the stream have the length and entryID to read.
+                if (dataStream.available() < 12) {
+                    log.warn("There hasn't enough data to read, current available data has {} bytes,"
+                        + " seek to the first entry {} to avoid EOF exception", inputStream.available(), firstEntry);
+                    inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+                }
 
                 while (entriesToRead > 0) {
                     if (state == State.Closed) {
@@ -149,14 +149,20 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                         log.warn("The read entry {} is not the expected entry {} but in the range of {} - {},"
                             + " seeking to the right position", entryId, nextExpectedId, nextExpectedId, lastEntry);
                         inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                        continue;
                     } else if (entryId < nextExpectedId
                         && !index.getIndexEntryForEntry(nextExpectedId).equals(index.getIndexEntryForEntry(entryId))) {
                         log.warn("Read an unexpected entry id {} which is smaller than the next expected entry id {}"
                         + ", seeking to the right position", entries, nextExpectedId);
                         inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                        continue;
                     } else if (entryId > lastEntry) {
+                        // in the normal case, the entry id should increment in order. But if there has random access in
+                        // the read method, we should allow to seek to the right position and the entry id should
+                        // never over to the last entry again.
+                        if (!seeked) {
+                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            seeked = true;
+                            continue;
+                        }
                         log.info("Expected to read {}, but read {}, which is greater than last entry {}",
                             nextExpectedId, entryId, lastEntry);
                         throw new BKException.BKUnexpectedConditionException();
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
index ffe8fb2..36541b4 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
@@ -260,4 +260,27 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase {
         toTest.seekForward(after);
         assertStreamsMatch(toTest, toCompare);
     }
+
+    @Test
+    public void testAvailable() throws IOException {
+        String objectKey = "testAvailable";
+        int objectSize = 2048;
+        RandomInputStream toWrite = new RandomInputStream(0, objectSize);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength(objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        BackedInputStream bis = new BlobStoreBackedInputStreamImpl(
+            blobStore, BUCKET, objectKey, (k, md) -> {}, objectSize, 512);
+        Assert.assertEquals(bis.available(), objectSize);
+        bis.seek(500);
+        Assert.assertEquals(bis.available(), objectSize - 500);
+        bis.seek(1024);
+        Assert.assertEquals(bis.available(), 1024);
+        bis.seek(2048);
+        Assert.assertEquals(bis.available(), 0);
+    }
 }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 90d8b11..77dfc55 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -477,4 +478,22 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
             Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
         }
     }
+
+    @Test
+    public void testReadEOFException() throws Throwable {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = getOffloader();
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+        toTest.readAsync(0, toTest.getLastAddConfirmed()).get();
+
+        try {
+            toTest.readAsync(0, 0).get();
+        } catch (Exception e) {
+            fail("Get unexpected exception when reading entries", e);
+        }
+    }
 }