You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:18:42 UTC
[pulsar] 16/20: Handling error in creation of non-durable cursor
(#7355)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4b103cd9c742392a16f815c6b709dc9491882c13
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 26 21:11:40 2020 -0700
Handling error in creation of non-durable cursor (#7355)
* Handling error in creation of non-durable cursor
* Fixed tests
(cherry picked from commit 0a5f0a065e76fd6dff6243441d2cfbf25f0ee3dc)
---
.../broker/service/persistent/PersistentTopic.java | 57 +++++++++++-----------
1 file changed, 29 insertions(+), 28 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d7fcd41..ca7fbd4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -147,7 +147,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
-
+
// topic has every published chunked message since topic is loaded
public boolean msgChunkPublished;
@@ -688,12 +688,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
MessageId startMessageId, long startMessageRollbackDurationSec) {
- CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
synchronized (ledger) {
// Create a new non-durable cursor only for the first consumer that connects
- Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
+ PersistentSubscription subscription = subscriptions.get(subscriptionName);
+
+ if (subscription == null) {
MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
: (MessageIdImpl) MessageId.latest;
@@ -702,7 +703,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
// Ensure that the start message id starts from a valid entry.
if (ledgerId >= 0 && entryId >= 0
&& msgId instanceof BatchMessageIdImpl) {
- // When the start message is relative to a batch, we need to take one step back on the previous message,
+ // When the start message is relative to a batch, we need to take one step back on the previous
+ // message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages if needed.
entryId = msgId.getEntryId() - 1;
@@ -713,32 +715,31 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
try {
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
} catch (ManagedLedgerException e) {
- subscriptionFuture.completeExceptionally(e);
+ return FutureUtil.failedFuture(e);
}
- return new PersistentSubscription(this, subscriptionName, cursor, false);
- });
- if (!subscriptionFuture.isDone()) {
- if (startMessageRollbackDurationSec > 0) {
- long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
- subscription.resetCursor(timestamp).handle((s, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
- startMessageRollbackDurationSec);
- }
- subscriptionFuture.complete(subscription);
- return null;
- });
- } else {
- subscriptionFuture.complete(subscription);
- }
+ subscription = new PersistentSubscription(this, subscriptionName, cursor, false);
+ subscriptions.put(subscriptionName, subscription);
+ }
+
+ if (startMessageRollbackDurationSec > 0) {
+ long timestamp = System.currentTimeMillis()
+ - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
+ CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
+ final Subscription finalSubscription = subscription;
+ subscription.resetCursor(timestamp).handle((s, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
+ startMessageRollbackDurationSec);
+ }
+ subscriptionFuture.complete(finalSubscription);
+ return null;
+ });
+ return subscriptionFuture;
} else {
- // failed to initialize managed-cursor: clean up created subscription
- subscriptions.remove(subscriptionName);
+ return CompletableFuture.completedFuture(subscription);
}
}
-
- return subscriptionFuture;
}
@Override
@@ -936,7 +937,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
public CompletableFuture<Void> close() {
return close(false);
}
-
+
/**
* Close this topic - close all producers and subscriptions associated with this topic
*
@@ -967,7 +968,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
-
+
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);
@@ -1794,7 +1795,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
-
+
this.updateMaxPublishRate(data);
producers.values().forEach(producer -> {