You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/03/13 06:10:42 UTC
[pulsar] 01/17: Creating a topic does not wait for creating cursor
of replicators (#6364)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 76dfd14f07409a8697cc32482df9b053b1124aaa
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Feb 23 19:43:48 2020 +0800
Creating a topic does not wait for creating cursor of replicators (#6364)
### Motivation
Creating a topic does not wait for creating cursor of replicators
## Verifying this change
The exists unit test can cover this change
(cherry picked from commit 336e971f4d41d6ffb26b3b53a20f36a360c070e8)
---
.../pulsar/broker/service/AbstractReplicator.java | 38 +++++++-----
.../nonpersistent/NonPersistentReplicator.java | 5 ++
.../service/persistent/PersistentReplicator.java | 71 ++++++++++++++++++++--
.../broker/service/persistent/PersistentTopic.java | 37 ++++-------
4 files changed, 108 insertions(+), 43 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index d9b2f8e..13cd091 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -122,30 +122,38 @@ public abstract class AbstractReplicator {
log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", topicName,
localCluster, remoteCluster, state);
}
-
return;
}
log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
- producerBuilder.createAsync().thenAccept(producer -> {
- readEntries(producer);
- }).exceptionally(ex -> {
- if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
- long waitTimeMs = backOff.next();
- log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
- localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
-
- // BackOff before retrying
- brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
- } else {
- log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
- localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
- }
+ openCursorAsync().thenAccept(v ->
+ producerBuilder.createAsync()
+ .thenAccept(this::readEntries)
+ .exceptionally(ex -> {
+ retryCreateProducer(ex);
+ return null;
+ })).exceptionally(ex -> {
+ retryCreateProducer(ex);
return null;
});
+ }
+ private void retryCreateProducer(Throwable ex) {
+ if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
+ long waitTimeMs = backOff.next();
+ log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
+ localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
+
+ // BackOff before retrying
+ brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
+ } else {
+ log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
+ localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
+ }
}
+ protected abstract CompletableFuture<Void> openCursorAsync();
+
protected synchronized CompletableFuture<Void> closeProducerAsync() {
if (producer == null) {
STATE_UPDATER.set(this, State.Stopped);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index b6ea53a..c109560 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -252,6 +252,11 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
}
@Override
+ protected CompletableFuture<Void> openCursorAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
return producer != null && producer.isConnected();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2bc18ca..98c5d5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
@@ -33,11 +34,13 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
+import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
@@ -46,6 +49,7 @@ import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
+import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -55,6 +59,7 @@ import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.SendCallback;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
@@ -65,7 +70,9 @@ import org.slf4j.LoggerFactory;
public class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback {
private final PersistentTopic topic;
- private final ManagedCursor cursor;
+ private final String replicatorName;
+ private final ManagedLedger ledger;
+ protected ManagedCursor cursor;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
@@ -97,11 +104,14 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
private final ReplicatorStats stats = new ReplicatorStats();
+ // Only for test
public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
- this.topic = topic;
+ this.replicatorName = cursor.getName();
+ this.ledger = cursor.getManagedLedger();
this.cursor = cursor;
+ this.topic = topic;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
@@ -116,6 +126,25 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
startProducer();
}
+ public PersistentReplicator(PersistentTopic topic, String replicatorName, String localCluster, String remoteCluster,
+ BrokerService brokerService, ManagedLedger ledger) throws NamingException {
+ super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
+ this.replicatorName = replicatorName;
+ this.ledger = ledger;
+ this.topic = topic;
+ HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+ PENDING_MESSAGES_UPDATER.set(this, 0);
+
+ readBatchSize = Math.min(
+ producerQueueSize,
+ topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
+ producerQueueThreshold = (int) (producerQueueSize * 0.9);
+
+ this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
+
+ startProducer();
+ }
+
@Override
protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer) {
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
@@ -158,6 +187,36 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
this.cursor.setInactive();
}
+ @Override
+ protected synchronized CompletableFuture<Void> openCursorAsync() {
+ log.info("[{}][{} -> {}] Starting open cursor for replicator", topicName, localCluster, remoteCluster);
+ if (cursor != null) {
+ log.info("[{}][{} -> {}] Using the exists cursor for replicator", topicName, localCluster, remoteCluster);
+ if (expiryMonitor == null) {
+ this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+ CompletableFuture<Void> res = new CompletableFuture<>();
+ ledger.asyncOpenCursor(replicatorName, InitialPosition.Earliest, new OpenCursorCallback() {
+ @Override
+ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+ log.info("[{}][{} -> {}] Open cursor succeed for replicator", topicName, localCluster, remoteCluster);
+ PersistentReplicator.this.cursor = cursor;
+ PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+ res.complete(null);
+ }
+
+ @Override
+ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}][{} -> {}] Open cursor failed for replicator", topicName, localCluster, remoteCluster, exception);
+ res.completeExceptionally(new PersistenceException(exception));
+ }
+
+ }, null);
+ return res;
+ }
+
/**
* Calculate available permits for read entries.
@@ -601,7 +660,9 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
msgExpired.calculateRate();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
- stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
+ if (expiryMonitor != null) {
+ stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
+ }
}
public ReplicatorStats getStats() {
@@ -639,7 +700,9 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
// don't do anything for almost caught-up connected subscriptions
return;
}
- expiryMonitor.expireMessages(messageTTLInSeconds);
+ if (expiryMonitor != null) {
+ expiryMonitor.expireMessages(messageTTLInSeconds);
+ }
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 34cad87..7eb4dd9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -217,7 +217,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
- boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor, localCluster);
+ boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor.getName(), localCluster);
if (!isReplicatorStarted) {
throw new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
@@ -1156,37 +1156,26 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
- String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
- ledger.asyncOpenCursor(name, new OpenCursorCallback() {
- @Override
- public void openCursorComplete(ManagedCursor cursor, Object ctx) {
- String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
- boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
- if (isReplicatorStarted) {
- future.complete(null);
- } else {
- future.completeExceptionally(new NamingException(
- PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
- }
- }
-
- @Override
- public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
- future.completeExceptionally(new PersistenceException(exception));
- }
-
- }, null);
+ String replicatorName = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
+ String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+ boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, replicatorName, localCluster);
+ if (isReplicatorStarted) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(new NamingException(
+ PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
+ }
return future;
}
- protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor,
+ protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, String replicatorName,
String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
- return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
- brokerService);
+ return new PersistentReplicator(PersistentTopic.this, replicatorName, localCluster, remoteCluster,
+ brokerService, ledger);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);