You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/26 14:39:37 UTC

[pulsar] branch branch-2.7 updated (adba3f3 -> 06bd85d)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from adba3f3  [Performance] Use single instance of parser (#10664)
     new 71b96a0  Revert "Creating a topic does not wait for creating cursor of replicators (#6364)" (#10674)
     new e37e0df  MINOR: Add error message to setMaxPendingMessagesAcrossPartitions (#10709)
     new f316469  [broker] Fix issue where StackOverflowError occurs when trying to redeliver a large number of already acked messages (#10696)
     new 06bd85d  Fix compile issue after cherry-pick

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../broker/service/AbstractBaseDispatcher.java     |  4 +-
 .../pulsar/broker/service/AbstractReplicator.java  | 38 ++++------
 .../nonpersistent/NonPersistentReplicator.java     |  5 --
 .../PersistentDispatcherMultipleConsumers.java     | 30 +++++---
 .../PersistentDispatcherSingleActiveConsumer.java  |  3 +-
 .../service/persistent/PersistentReplicator.java   | 84 +++-------------------
 ...istentStickyKeyDispatcherMultipleConsumers.java |  2 +-
 .../broker/service/persistent/PersistentTopic.java | 42 +++++++----
 .../broker/service/PersistentTopicE2ETest.java     |  4 ++
 .../impl/conf/ProducerConfigurationData.java       |  3 +-
 .../client/impl/ProducerBuilderImplTest.java       |  5 ++
 11 files changed, 88 insertions(+), 132 deletions(-)

[pulsar] 03/04: [broker] Fix issue where StackOverflowError occurs when trying to redeliver a large number of already acked messages (#10696)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f316469d602438b18cdf859fae158e03b6c59dba
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Thu May 27 10:20:31 2021 +0900

    [broker] Fix issue where StackOverflowError occurs when trying to redeliver a large number of already acked messages (#10696)
    
    The other day, some of our broker servers got the following StackOverflowError:
    ```
    13:44:17.410 [pulsar-io-21-6] WARN  o.a.pulsar.broker.service.ServerCnx  - [/xxx.xxx.xxx.xxx:58438] Got exception StackOverflowError : null
    java.lang.StackOverflowError: null
            at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
            at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2746)
            at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
            at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
            at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
            at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
            at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
            at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
            at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1086)
            at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1066)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntries(PersistentDispatcherMultipleConsumers.java:341)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:309)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
    ```
    
    This phenomenon can be reproduced by the following procedure:
    
    1. Store a large number of messages in the backlog of a topic
    2. Connect some Shared consumers to the topic. These consumers receive messages but do not acknowledge at all
    3. Run skip-all to remove all messages from the backlog
    4. Add another consumer whose receiver queue size is small
    5. Close all the consumers added in step 2
    6. StackOverflowError occurs on the broker
    
    If broker receives a large number of redelivery requests for messages that have already been deleted, `PersistentDispatcherMultipleConsumers#readMoreEntries()` is called recursively many times. As a result, we get a StackOverflowError.
    https://github.com/apache/pulsar/blob/a6aed551026825ef2de6b1ac5916d17daf1af5c3/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L232-L252
    
    - Avoid recursive calls of `readMoreEntries()` on the same thread
    - If the dispatcher receives redelivery requests for messages whose positions are earlier than the mark delete position, it does not need to add them to `messagesToRedeliver`
    
    (cherry picked from commit 894d92b2be3bee334e7ce32760c4d2e7978603aa)
---
 .../broker/service/AbstractBaseDispatcher.java     |  4 +--
 .../PersistentDispatcherMultipleConsumers.java     | 30 ++++++++++++++--------
 .../PersistentDispatcherSingleActiveConsumer.java  |  3 ++-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  2 +-
 .../broker/service/PersistentTopicE2ETest.java     |  3 +++
 5 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 2813d70..997ce5c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -200,8 +200,9 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
         return key;
     }
 
-    protected void addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId) {
         // No-op
+        return false;
     }
 
     private void handleTxnCommitMarker(Entry entry) {
@@ -234,5 +235,4 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
             }
         });
     }
-
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 27b5e2a..3f61156 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -209,8 +209,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                     log.debug("[{}] Consumer are left, reading more entries", name);
                 }
                 consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> {
-                    messagesToRedeliver.add(ledgerId, entryId);
-                    redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
+                    if (addMessageToReplay(ledgerId, entryId)) {
+                        redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
+                    }
                 });
                 totalAvailablePermits -= consumer.getAvailablePermits();
                 if (log.isDebugEnabled()) {
@@ -337,7 +338,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 // next entries as readCompletedEntries-callback was never called
                 if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
                     havePendingReplayRead = false;
-                    readMoreEntries();
+                    // We should not call readMoreEntries() recursively in the same thread
+                    // as there is a risk of StackOverflowError
+                    topic.getBrokerService().executor().execute(() -> readMoreEntries());
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
                 log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
@@ -560,7 +563,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                         entries.size() - start);
             }
             entries.subList(start, entries.size()).forEach(entry -> {
-                messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
+                addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
                 entry.release();
             });
         }
@@ -677,7 +680,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
         consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> {
-            messagesToRedeliver.add(ledgerId, entryId);
+            addMessageToReplay(ledgerId, entryId);
         });
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer,
@@ -689,8 +692,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
         positions.forEach(position -> {
-            messagesToRedeliver.add(position.getLedgerId(), position.getEntryId());
-            redeliveryTracker.addIfAbsent(position);
+            if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) {
+                redeliveryTracker.addIfAbsent(position);
+            }
         });
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions);
@@ -837,9 +841,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         }
     }
 
-    @Override
-    public void addMessageToReplay(long ledgerId, long entryId) {
-        this.messagesToRedeliver.add(ledgerId, entryId);
+    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+        PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition();
+        if (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId()
+                || (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())) {
+            messagesToRedeliver.add(ledgerId, entryId);
+            return true;
+        } else {
+            return false;
+        }
     }
 
     public PersistentTopic getTopic() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index a5c8a5f..2930a33 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -598,8 +598,9 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     }
 
     @Override
-    public void addMessageToReplay(long ledgerId, long entryId) {
+    public boolean addMessageToReplay(long ledgerId, long entryId) {
         this.messagesToRedeliver.add(ledgerId, entryId);
+        return false;
     }
 
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 27bcf97..45921dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -200,7 +200,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 // so we discard for now and mark them for later redelivery
                 for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
                     Entry entry = entriesWithSameKey.get(i);
-                    messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
+                    addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
                     entry.release();
                     entriesWithSameKey.set(i, null);
                 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 2b6e9e2..407593f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1603,6 +1603,9 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         replayMap.set(dispatcher, messagesToReplay);
         // (a) redelivery with all acked-message should clear messageReply bucket
         dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            return messagesToReplay.isEmpty();
+        });
         assertEquals(messagesToReplay.size(), 0);
 
         // (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it

[pulsar] 01/04: Revert "Creating a topic does not wait for creating cursor of replicators (#6364)" (#10674)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 71b96a0579642bd6f8e8da0e59e041ec37304abe
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 22 10:13:55 2021 -0700

    Revert "Creating a topic does not wait for creating cursor of replicators (#6364)" (#10674)
    
    This reverts commit 336e971f4d41d6ffb26b3b53a20f36a360c070e8.
    
    (cherry picked from commit e4864921358b89a32b49bc967a6e582fce992988)
---
 .../pulsar/broker/service/AbstractReplicator.java  | 38 ++++------
 .../nonpersistent/NonPersistentReplicator.java     |  5 --
 .../service/persistent/PersistentReplicator.java   | 83 +++-------------------
 .../broker/service/persistent/PersistentTopic.java | 42 +++++++----
 4 files changed, 52 insertions(+), 116 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index a55d21a..00801e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -122,38 +122,30 @@ public abstract class AbstractReplicator {
                 log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", topicName,
                         localCluster, remoteCluster, state);
             }
+
             return;
         }
 
         log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
-        openCursorAsync().thenAccept(v ->
-            producerBuilder.createAsync()
-                .thenAccept(this::readEntries)
-                .exceptionally(ex -> {
-                    retryCreateProducer(ex);
-                    return null;
-        })).exceptionally(ex -> {
-            retryCreateProducer(ex);
+        producerBuilder.createAsync().thenAccept(producer -> {
+            readEntries(producer);
+        }).exceptionally(ex -> {
+            if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
+                long waitTimeMs = backOff.next();
+                log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
+                        localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
+
+                // BackOff before retrying
+                brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
+            } else {
+                log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
+                        localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
+            }
             return null;
         });
-    }
 
-    private void retryCreateProducer(Throwable ex) {
-        if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
-            long waitTimeMs = backOff.next();
-            log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
-                localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
-
-            // BackOff before retrying
-            brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
-        } else {
-            log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
-                localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
-        }
     }
 
-    protected abstract CompletableFuture<Void> openCursorAsync();
-
     protected synchronized CompletableFuture<Void> closeProducerAsync() {
         if (producer == null) {
             STATE_UPDATER.set(this, State.Stopped);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 8e4c53a..be9dbff0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -252,11 +252,6 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
     }
 
     @Override
-    protected CompletableFuture<Void> openCursorAsync() {
-        return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
     public boolean isConnected() {
         ProducerImpl<?> producer = this.producer;
         return producer != null && producer.isConnected();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 8188a61..8c5428b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -34,13 +34,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
-import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
@@ -48,7 +46,6 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
-import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -70,9 +67,7 @@ import org.slf4j.LoggerFactory;
 public class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback {
 
     private final PersistentTopic topic;
-    private final String replicatorName;
-    private final ManagedLedger ledger;
-    protected volatile ManagedCursor cursor;
+    protected final ManagedCursor cursor;
 
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
@@ -99,41 +94,19 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
 
     private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
 
-    private PersistentMessageExpiryMonitor expiryMonitor;
+    private final PersistentMessageExpiryMonitor expiryMonitor;
     // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
 
     private final ReplicatorStats stats = new ReplicatorStats();
 
-    // Only for test
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
             BrokerService brokerService) throws NamingException {
         super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
-        this.replicatorName = cursor.getName();
-        this.ledger = cursor.getManagedLedger();
-        this.cursor = cursor;
-        this.topic = topic;
-        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null);
-        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-        PENDING_MESSAGES_UPDATER.set(this, 0);
-
-        readBatchSize = Math.min(
-            producerQueueSize,
-            topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
-        readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes();
-        producerQueueThreshold = (int) (producerQueueSize * 0.9);
-
-        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
-
-        startProducer();
-    }
-
-    public PersistentReplicator(PersistentTopic topic, String replicatorName, String localCluster, String remoteCluster,
-            BrokerService brokerService, ManagedLedger ledger) throws NamingException {
-        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
-        this.replicatorName = replicatorName;
-        this.ledger = ledger;
         this.topic = topic;
+        this.cursor = cursor;
+        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
+                Codec.decode(cursor.getName()), cursor, null);
         HAVE_PENDING_READ_UPDATER.set(this, FALSE);
         PENDING_MESSAGES_UPDATER.set(this, 0);
 
@@ -192,37 +165,6 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         }
     }
 
-    @Override
-    protected CompletableFuture<Void> openCursorAsync() {
-        log.info("[{}][{} -> {}] Starting open cursor for replicator", topicName, localCluster, remoteCluster);
-        if (cursor != null) {
-            log.info("[{}][{} -> {}] Using the exists cursor for replicator", topicName, localCluster, remoteCluster);
-            if (expiryMonitor == null) {
-                this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null);
-            }
-            return CompletableFuture.completedFuture(null);
-        }
-        CompletableFuture<Void> res = new CompletableFuture<>();
-        ledger.asyncOpenCursor(replicatorName, InitialPosition.Earliest, new OpenCursorCallback() {
-            @Override
-            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                log.info("[{}][{} -> {}] Open cursor succeed for replicator", topicName, localCluster, remoteCluster);
-                PersistentReplicator.this.cursor = cursor;
-                PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null);
-                res.complete(null);
-            }
-
-            @Override
-            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
-                log.warn("[{}][{} -> {}] Open cursor failed for replicator", topicName, localCluster, remoteCluster, exception);
-                res.completeExceptionally(new PersistenceException(exception));
-            }
-
-        }, null);
-        return res;
-    }
-
-
     /**
      * Calculate available permits for read entries.
      *
@@ -671,9 +613,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         msgExpired.calculateRate();
         stats.msgRateOut = msgOut.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
-        if (expiryMonitor != null) {
-            stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
-        }
+        stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
     }
 
     public ReplicatorStats getStats() {
@@ -711,17 +651,12 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
             // don't do anything for almost caught-up connected subscriptions
             return false;
         }
-        if (expiryMonitor != null) {
-            return expiryMonitor.expireMessages(messageTTLInSeconds);
-        }
-        return false;
+
+        return expiryMonitor.expireMessages(messageTTLInSeconds);
     }
 
     public boolean expireMessages(Position position) {
-        if (expiryMonitor != null) {
-            return expiryMonitor.expireMessages(position);
-        }
-        return false;
+        return expiryMonitor.expireMessages(position);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f4f6e49..bae9a77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -245,7 +245,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
                 String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
-                boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor.getName(), localCluster);
+                boolean isReplicatorStarted = false;
+                try {
+                    isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster);
+                } catch (Exception e) {
+                    log.warn("[{}] failed to start replication", topic, e);
+                }
                 if (!isReplicatorStarted) {
                     throw new NamingException(
                         PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
@@ -1272,26 +1277,35 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
-        String replicatorName = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
-        String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
-        boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, replicatorName, localCluster);
-        if (isReplicatorStarted) {
-            future.complete(null);
-        } else {
-            future.completeExceptionally(new NamingException(
-                PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
-        }
+        String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
+        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+            @Override
+            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+                String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+                boolean isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster);
+                if (isReplicatorStarted) {
+                    future.complete(null);
+                } else {
+                    future.completeExceptionally(new NamingException(
+                            PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
+                }
+            }
 
+            @Override
+            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
+                future.completeExceptionally(new PersistenceException(exception));
+            }
+
+        }, null);
         return future;
     }
 
-    protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, String replicatorName,
-            String localCluster) {
+    protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
         AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
         replicators.computeIfAbsent(remoteCluster, r -> {
             try {
-                return new PersistentReplicator(PersistentTopic.this, replicatorName, localCluster, remoteCluster,
-                        brokerService, ledger);
+                return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
+                        brokerService);
             } catch (NamingException e) {
                 isReplicatorStarted.set(false);
                 log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);

[pulsar] 02/04: MINOR: Add error message to setMaxPendingMessagesAcrossPartitions (#10709)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e37e0df5a1f865ce5dbe5da05c183f6d9ac15c62
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Thu May 27 09:24:07 2021 +0800

    MINOR: Add error message to setMaxPendingMessagesAcrossPartitions (#10709)
    
    ### Motivation
    When maxPendingMessagesAcrossPartitions is less than maxPendingMessages, an error message will be thrown: null
    
    ### Modifications
    return "maxPendingMessagesAcrossPartitions needs to be >= maxPendingMessages" instead
    
    (cherry picked from commit 2cfed8cd62655c79828ebebb81d20736c6fc2fa4)
---
 .../apache/pulsar/client/impl/conf/ProducerConfigurationData.java    | 3 ++-
 .../java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java  | 5 +++++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index c48598a..3100952 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -133,7 +133,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
     }
 
     public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
-        checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages);
+        checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages,
+                "maxPendingMessagesAcrossPartitions needs to be >= maxPendingMessages");
         this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index b3cda2e..18945cc 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -343,6 +343,11 @@ public class ProducerBuilderImplTest {
         producerBuilderImpl.maxPendingMessagesAcrossPartitions(999);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "maxPendingMessagesAcrossPartitions needs to be >= maxPendingMessages")
+    public void testProducerBuilderImplWhenMaxPendingMessagesAcrossPartitionsPropertyIsInvalidErrorMessages() {
+        producerBuilderImpl.maxPendingMessagesAcrossPartitions(999);
+    }
+
     @Test
     public void testProducerBuilderImplWhenNumericPropertiesAreValid() {
         producerBuilderImpl.batchingMaxPublishDelay(1, TimeUnit.SECONDS);

[pulsar] 04/04: Fix compile issue after cherry-pick

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 06bd85d804cf4b6ad229e17be79ffc1a004d7af4
Author: penghui <pe...@apache.org>
AuthorDate: Sat Jun 26 22:38:35 2021 +0800

    Fix compile issue after cherry-pick
---
 .../apache/pulsar/broker/service/persistent/PersistentReplicator.java    | 1 -
 .../java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java    | 1 +
 2 files changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 8c5428b..d51103b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -55,7 +55,6 @@ import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.SendCallback;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 407593f..c7344d6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -82,6 +82,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;