You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/03/13 06:10:42 UTC

[pulsar] 01/17: Creating a topic does not wait for creating cursor of replicators (#6364)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 76dfd14f07409a8697cc32482df9b053b1124aaa
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Feb 23 19:43:48 2020 +0800

    Creating a topic does not wait for creating cursor of replicators (#6364)
    
    ### Motivation
    
    Creating a topic does not wait for creating cursor of replicators
    
    ## Verifying this change
    
    The exists unit test can cover this change
    (cherry picked from commit 336e971f4d41d6ffb26b3b53a20f36a360c070e8)
---
 .../pulsar/broker/service/AbstractReplicator.java  | 38 +++++++-----
 .../nonpersistent/NonPersistentReplicator.java     |  5 ++
 .../service/persistent/PersistentReplicator.java   | 71 ++++++++++++++++++++--
 .../broker/service/persistent/PersistentTopic.java | 37 ++++-------
 4 files changed, 108 insertions(+), 43 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index d9b2f8e..13cd091 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -122,30 +122,38 @@ public abstract class AbstractReplicator {
                 log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", topicName,
                         localCluster, remoteCluster, state);
             }
-
             return;
         }
 
         log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
-        producerBuilder.createAsync().thenAccept(producer -> {
-            readEntries(producer);
-        }).exceptionally(ex -> {
-            if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
-                long waitTimeMs = backOff.next();
-                log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
-                        localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
-
-                // BackOff before retrying
-                brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
-            } else {
-                log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
-                        localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
-            }
+        openCursorAsync().thenAccept(v ->
+            producerBuilder.createAsync()
+                .thenAccept(this::readEntries)
+                .exceptionally(ex -> {
+                    retryCreateProducer(ex);
+                    return null;
+        })).exceptionally(ex -> {
+            retryCreateProducer(ex);
             return null;
         });
+    }
 
+    private void retryCreateProducer(Throwable ex) {
+        if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
+            long waitTimeMs = backOff.next();
+            log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
+                localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
+
+            // BackOff before retrying
+            brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
+        } else {
+            log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
+                localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
+        }
     }
 
+    protected abstract CompletableFuture<Void> openCursorAsync();
+
     protected synchronized CompletableFuture<Void> closeProducerAsync() {
         if (producer == null) {
             STATE_UPDATER.set(this, State.Stopped);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index b6ea53a..c109560 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -252,6 +252,11 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
     }
 
     @Override
+    protected CompletableFuture<Void> openCursorAsync() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
     public boolean isConnected() {
         ProducerImpl<?> producer = this.producer;
         return producer != null && producer.isConnected();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2bc18ca..98c5d5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
@@ -33,11 +34,13 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
+import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
@@ -46,6 +49,7 @@ import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
+import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -55,6 +59,7 @@ import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.SendCallback;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
@@ -65,7 +70,9 @@ import org.slf4j.LoggerFactory;
 public class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback {
 
     private final PersistentTopic topic;
-    private final ManagedCursor cursor;
+    private final String replicatorName;
+    private final ManagedLedger ledger;
+    protected ManagedCursor cursor;
 
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
@@ -97,11 +104,14 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
 
     private final ReplicatorStats stats = new ReplicatorStats();
 
+    // Only for test
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
             BrokerService brokerService) throws NamingException {
         super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
-        this.topic = topic;
+        this.replicatorName = cursor.getName();
+        this.ledger = cursor.getManagedLedger();
         this.cursor = cursor;
+        this.topic = topic;
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
         HAVE_PENDING_READ_UPDATER.set(this, FALSE);
         PENDING_MESSAGES_UPDATER.set(this, 0);
@@ -116,6 +126,25 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         startProducer();
     }
 
+    public PersistentReplicator(PersistentTopic topic, String replicatorName, String localCluster, String remoteCluster,
+            BrokerService brokerService, ManagedLedger ledger) throws NamingException {
+        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
+        this.replicatorName = replicatorName;
+        this.ledger = ledger;
+        this.topic = topic;
+        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+        PENDING_MESSAGES_UPDATER.set(this, 0);
+
+        readBatchSize = Math.min(
+            producerQueueSize,
+            topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
+        producerQueueThreshold = (int) (producerQueueSize * 0.9);
+
+        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
+
+        startProducer();
+    }
+
     @Override
     protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer) {
         // Rewind the cursor to be sure to read again all non-acked messages sent while restarting
@@ -158,6 +187,36 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         this.cursor.setInactive();
     }
 
+    @Override
+    protected synchronized CompletableFuture<Void> openCursorAsync() {
+        log.info("[{}][{} -> {}] Starting open cursor for replicator", topicName, localCluster, remoteCluster);
+        if (cursor != null) {
+            log.info("[{}][{} -> {}] Using the exists cursor for replicator", topicName, localCluster, remoteCluster);
+            if (expiryMonitor == null) {
+                this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+            }
+            return CompletableFuture.completedFuture(null);
+        }
+        CompletableFuture<Void> res = new CompletableFuture<>();
+        ledger.asyncOpenCursor(replicatorName, InitialPosition.Earliest, new OpenCursorCallback() {
+            @Override
+            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+                log.info("[{}][{} -> {}] Open cursor succeed for replicator", topicName, localCluster, remoteCluster);
+                PersistentReplicator.this.cursor = cursor;
+                PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+                res.complete(null);
+            }
+
+            @Override
+            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
+                log.warn("[{}][{} -> {}] Open cursor failed for replicator", topicName, localCluster, remoteCluster, exception);
+                res.completeExceptionally(new PersistenceException(exception));
+            }
+
+        }, null);
+        return res;
+    }
+
 
     /**
      * Calculate available permits for read entries.
@@ -601,7 +660,9 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         msgExpired.calculateRate();
         stats.msgRateOut = msgOut.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
-        stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
+        if (expiryMonitor != null) {
+            stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
+        }
     }
 
     public ReplicatorStats getStats() {
@@ -639,7 +700,9 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
             // don't do anything for almost caught-up connected subscriptions
             return;
         }
-        expiryMonitor.expireMessages(messageTTLInSeconds);
+        if (expiryMonitor != null) {
+            expiryMonitor.expireMessages(messageTTLInSeconds);
+        }
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 34cad87..7eb4dd9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -217,7 +217,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
                 String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
-                boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor, localCluster);
+                boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor.getName(), localCluster);
                 if (!isReplicatorStarted) {
                     throw new NamingException(
                             PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
@@ -1156,37 +1156,26 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
-        String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
-        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
-            @Override
-            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
-                boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
-                if (isReplicatorStarted) {
-                    future.complete(null);
-                } else {
-                    future.completeExceptionally(new NamingException(
-                            PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
-                }
-            }
-
-            @Override
-            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
-                future.completeExceptionally(new PersistenceException(exception));
-            }
-
-        }, null);
+        String replicatorName = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
+        String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+        boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, replicatorName, localCluster);
+        if (isReplicatorStarted) {
+            future.complete(null);
+        } else {
+            future.completeExceptionally(new NamingException(
+                PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
+        }
 
         return future;
     }
 
-    protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor,
+    protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, String replicatorName,
             String localCluster) {
         AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
         replicators.computeIfAbsent(remoteCluster, r -> {
             try {
-                return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
-                        brokerService);
+                return new PersistentReplicator(PersistentTopic.this, replicatorName, localCluster, remoteCluster,
+                        brokerService, ledger);
             } catch (NamingException e) {
                 isReplicatorStarted.set(false);
                 log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);