You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/10/22 09:21:16 UTC

[pulsar] branch branch-2.8 updated (f5fcf57 -> 90defac)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from f5fcf57  Fix cherry-pick issue
     new 1e290cc  Fix the read performance issue in the offload readAsync (#12443)
     new 6e9a32b  Add retry to tolerate the offload index file read failure (#12452)
     new 90defac  Reduce the readFailureBackoff time (#12444)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf                                   |  9 ++++
 conf/standalone.conf                               |  9 ++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 21 ++++++++
 .../PersistentDispatcherSingleActiveConsumer.java  |  7 ++-
 .../impl/BlobStoreBackedInputStreamImpl.java       |  5 ++
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 58 ++++++++++++++++------
 .../offload/jcloud/impl/OffloadIndexBlockImpl.java |  6 +--
 .../jcloud/BlobStoreBackedInputStreamTest.java     | 23 +++++++++
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 19 +++++++
 9 files changed, 134 insertions(+), 23 deletions(-)

[pulsar] 03/03: Reduce the readFailureBackoff time (#12444)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 90defac3cb9522b88f53f16280a283bcc0156f93
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Oct 22 17:03:38 2021 +0800

    Reduce the readFailureBackoff time (#12444)
    
    ### Motivation
    When reading entries met exception, the dispatcher will backoff 15s
    to start next read. The 15s is unacceptable at most of case for the
    consumer. So it's better to reduce the interval.
    
    ### Modifications
    Make the read failure backoff start from 1s.
    
    (cherry picked from commit 99c90a548085d0dbefcc613eac729368933811c6)
---
 conf/broker.conf                                    |  9 +++++++++
 conf/standalone.conf                                |  9 +++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java  | 21 +++++++++++++++++++++
 .../PersistentDispatcherSingleActiveConsumer.java   |  7 +++++--
 4 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 6e4668b..0fe68a0 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -379,6 +379,15 @@ dispatcherMinReadBatchSize=1
 # Max number of entries to dispatch for a shared subscription. By default it is 20 entries.
 dispatcherMaxRoundRobinBatchSize=20
 
+# The read failure backoff initial time in milliseconds. By default it is 15s.
+dispatcherReadFailureBackoffInitialTimeInMs=15000
+
+# The read failure backoff max time in milliseconds. By default it is 60s.
+dispatcherReadFailureBackoffMaxTimeInMs=60000
+
+# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
+dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
+
 # Precise dispathcer flow control according to history message number of each entry
 preciseDispatcherFlowControl=false
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a76736a..7f52eca 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -236,6 +236,15 @@ dispatchThrottlingRateRelativeToPublishRate=false
 # backlog.
 dispatchThrottlingOnNonBacklogConsumerEnabled=true
 
+# The read failure backoff initial time in milliseconds. By default it is 15s.
+dispatcherReadFailureBackoffInitialTimeInMs=15000
+
+# The read failure backoff max time in milliseconds. By default it is 60s.
+dispatcherReadFailureBackoffMaxTimeInMs=60000
+
+# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
+dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
+
 # Precise dispathcer flow control according to history message number of each entry
 preciseDispatcherFlowControl=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c9b0c48..d0ca117 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -750,6 +750,27 @@ public class ServiceConfiguration implements PulsarConfiguration {
     @FieldContext(
         dynamic = true,
         category = CATEGORY_SERVER,
+        doc = "The read failure backoff initial time in milliseconds. By default it is 15s."
+    )
+    private int dispatcherReadFailureBackoffInitialTimeInMs = 15000;
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "The read failure backoff max time in milliseconds. By default it is 60s."
+    )
+    private int dispatcherReadFailureBackoffMaxTimeInMs = 60000;
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "The read failure backoff mandatory stop time in milliseconds. By default it is 0s."
+    )
+    private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
         doc = "Max number of entries to dispatch for a shared subscription. By default it is 20 entries."
     )
     private int dispatcherMaxRoundRobinBatchSize = 20;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 31aae39..04b8f5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -65,8 +65,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
     protected volatile boolean havePendingRead = false;
 
     protected volatile int readBatchSize;
-    protected final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS,
-            1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+    protected final Backoff readFailureBackoff;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
 
     private final RedeliveryTracker redeliveryTracker;
@@ -80,6 +79,10 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 : ""/* NonDurableCursor doesn't have name */);
         this.cursor = cursor;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
+        this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
+            TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
+            TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(),
+            TimeUnit.MILLISECONDS);
         this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
     }

[pulsar] 01/03: Fix the read performance issue in the offload readAsync (#12443)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1e290cc9d196f6072a564639ed8c3f00fb8e8a1e
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Oct 22 11:15:55 2021 +0800

    Fix the read performance issue in the offload readAsync (#12443)
    
    ---
    
    *Motivation*
    
    In the #12123, I add the seek operation at the readAsync method.
    It makes sure the data stream always seek to the first entry position
    to read and will not introduce EOF exception.
    But in the offload index entry, it groups a set of entries into a range,
    the seek operation will seek the posistion to the first entry in the range.
    That will introduce a performance issue because every read opeartion will
    read from the first entry in the range until it find the actual first read
    entry.
    But if we remove the seek operation, that will cause a EOF exception from
    the readAsync method. This PR adds a limitation of the seek opeartion.
    
    *Modifications*
    
    Add available method in the backedInputStream to get know how many bytes
    we can read from the stream.
    
    (cherry picked from commit b4d05ac1bf5cddb613d93806e505ef8788e1acc0)
---
 .../impl/BlobStoreBackedInputStreamImpl.java       |  5 +++++
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 26 +++++++++++++---------
 .../jcloud/BlobStoreBackedInputStreamTest.java     | 23 +++++++++++++++++++
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 19 ++++++++++++++++
 4 files changed, 63 insertions(+), 10 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index 6a204d5..e3fc68a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -141,4 +141,9 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
     public void close() {
         buffer.release();
     }
+
+    @Override
+    public int available() throws IOException {
+        return (int)(objectLen - cursor) + buffer.readableBytes();
+    }
 }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 2bf380d..98fdff4 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -105,6 +105,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.submit(() -> {
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            boolean seeked = false;
             try {
                 if (firstEntry > lastEntry
                     || firstEntry < 0
@@ -115,14 +116,13 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                 long entriesToRead = (lastEntry - firstEntry) + 1;
                 long nextExpectedId = firstEntry;
 
-                // seek the position to the first entry position, otherwise we will get the unexpected entry ID when doing
-                // the first read, that would cause read an unexpected entry id which is out of range between firstEntry
-                // and lastEntry
-                // for example, when we get 1-10 entries at first, then the next request is get 2-9, the following code
-                // will read the entry id from the stream and that is not the correct entry id, so it will seek to the
-                // correct position then read the stream as normal. But the entry id may exceed the last entry id, that
-                // will cause we are hardly to know the edge of the request range.
-                inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+                // checking the data stream has enough data to read to avoid throw EOF exception when reading data.
+                // 12 bytes represent the stream have the length and entryID to read.
+                if (dataStream.available() < 12) {
+                    log.warn("There hasn't enough data to read, current available data has {} bytes,"
+                        + " seek to the first entry {} to avoid EOF exception", inputStream.available(), firstEntry);
+                    inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+                }
 
                 while (entriesToRead > 0) {
                     if (state == State.Closed) {
@@ -149,14 +149,20 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                         log.warn("The read entry {} is not the expected entry {} but in the range of {} - {},"
                             + " seeking to the right position", entryId, nextExpectedId, nextExpectedId, lastEntry);
                         inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                        continue;
                     } else if (entryId < nextExpectedId
                         && !index.getIndexEntryForEntry(nextExpectedId).equals(index.getIndexEntryForEntry(entryId))) {
                         log.warn("Read an unexpected entry id {} which is smaller than the next expected entry id {}"
                         + ", seeking to the right position", entries, nextExpectedId);
                         inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                        continue;
                     } else if (entryId > lastEntry) {
+                        // in the normal case, the entry id should increment in order. But if there has random access in
+                        // the read method, we should allow to seek to the right position and the entry id should
+                        // never over to the last entry again.
+                        if (!seeked) {
+                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            seeked = true;
+                            continue;
+                        }
                         log.info("Expected to read {}, but read {}, which is greater than last entry {}",
                             nextExpectedId, entryId, lastEntry);
                         throw new BKException.BKUnexpectedConditionException();
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
index ffe8fb2..36541b4 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
@@ -260,4 +260,27 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase {
         toTest.seekForward(after);
         assertStreamsMatch(toTest, toCompare);
     }
+
+    @Test
+    public void testAvailable() throws IOException {
+        String objectKey = "testAvailable";
+        int objectSize = 2048;
+        RandomInputStream toWrite = new RandomInputStream(0, objectSize);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength(objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        BackedInputStream bis = new BlobStoreBackedInputStreamImpl(
+            blobStore, BUCKET, objectKey, (k, md) -> {}, objectSize, 512);
+        Assert.assertEquals(bis.available(), objectSize);
+        bis.seek(500);
+        Assert.assertEquals(bis.available(), objectSize - 500);
+        bis.seek(1024);
+        Assert.assertEquals(bis.available(), 1024);
+        bis.seek(2048);
+        Assert.assertEquals(bis.available(), 0);
+    }
 }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 90d8b11..77dfc55 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -477,4 +478,22 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
             Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
         }
     }
+
+    @Test
+    public void testReadEOFException() throws Throwable {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = getOffloader();
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+        toTest.readAsync(0, toTest.getLastAddConfirmed()).get();
+
+        try {
+            toTest.readAsync(0, 0).get();
+        } catch (Exception e) {
+            fail("Get unexpected exception when reading entries", e);
+        }
+    }
 }

[pulsar] 02/03: Add retry to tolerate the offload index file read failure (#12452)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6e9a32bd2fe26754e06221f1b4bcc38ad7dae54b
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Oct 22 11:22:57 2021 +0800

    Add retry to tolerate the offload index file read failure (#12452)
    
    * Add retry to tolerate the offload index file read failure
    ---
    
    *Motivation*
    
    We met the ReadLedgerMetadata exception when reading the index
    file. The index file only read once, so it may not read all the
    data from the stream and cause the metadata read failed. We need
    to ensure the all data is read from the stream or the stream is
    end. When the stream is end, we will receive the EOF exception,
    so we need to use `readFully` not `read`.
    
    Add the retry logic to tolerate the failure cause by the network.
    Because the stream is from the HTTP, so it's may break on some
    case. Add a small retry to avoid it to backoff by the dispatcher.
    
    *Modifications*
    
    - Use `readFully` to replace the `read` method
    - Add a small retry for handling the index block build
    
    * Add comments and enrich log
    
    (cherry picked from commit 33bcc17bbe07ffd9683556edecd9ce546b8fd93f)
---
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 32 ++++++++++++++++++----
 .../offload/jcloud/impl/OffloadIndexBlockImpl.java |  6 +---
 2 files changed, 27 insertions(+), 11 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 98fdff4..f4dc1b8 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -224,12 +224,32 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                                   VersionCheck versionCheck,
                                   long ledgerId, int readBufferSize)
             throws IOException {
-        Blob blob = blobStore.getBlob(bucket, indexKey);
-        versionCheck.check(indexKey, blob);
-        OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
-        OffloadIndexBlock index;
-        try (InputStream payLoadStream = blob.getPayload().openStream()) {
-            index = (OffloadIndexBlock) indexBuilder.fromStream(payLoadStream);
+        int retryCount = 3;
+        OffloadIndexBlock index = null;
+        IOException lastException = null;
+        // The following retry is used to avoid to some network issue cause read index file failure.
+        // If it can not recovery in the retry, we will throw the exception and the dispatcher will schedule to
+        // next read.
+        // If we use a backoff to control the retry, it will introduce a concurrent operation.
+        // We don't want to make it complicated, because in the most of case it shouldn't in the retry loop.
+        while (retryCount-- > 0) {
+            Blob blob = blobStore.getBlob(bucket, indexKey);
+            versionCheck.check(indexKey, blob);
+            OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
+            try (InputStream payLoadStream = blob.getPayload().openStream()) {
+                index = (OffloadIndexBlock) indexBuilder.fromStream(payLoadStream);
+            } catch (IOException e) {
+                // retry to avoid the network issue caused read failure
+                log.warn("Failed to get index block from the offoaded index file {}, still have {} times to retry",
+                    indexKey, retryCount, e);
+                lastException = e;
+                continue;
+            }
+            lastException = null;
+            break;
+        }
+        if (lastException != null) {
+            throw lastException;
         }
 
         BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 2f64089..a3fa14e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -338,11 +338,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         int segmentMetadataLength = dis.readInt();
 
         byte[] metadataBytes = new byte[segmentMetadataLength];
-
-        if (segmentMetadataLength != dis.read(metadataBytes)) {
-            log.error("Read ledgerMetadata from bytes failed");
-            throw new IOException("Read ledgerMetadata from bytes failed");
-        }
+        dis.readFully(metadataBytes);
         this.segmentMetadata = parseLedgerMetadata(metadataBytes);
 
         for (int i = 0; i < indexEntryCount; i++) {