You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:18:42 UTC

[pulsar] 16/20: Handling error in creation of non-durable cursor (#7355)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4b103cd9c742392a16f815c6b709dc9491882c13
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 26 21:11:40 2020 -0700

    Handling error in creation of non-durable cursor (#7355)
    
    * Handling error in creation of non-durable cursor
    
    * Fixed tests
    
    (cherry picked from commit 0a5f0a065e76fd6dff6243441d2cfbf25f0ee3dc)
---
 .../broker/service/persistent/PersistentTopic.java | 57 +++++++++++-----------
 1 file changed, 29 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d7fcd41..ca7fbd4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -147,7 +147,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
 
     private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
-    
+
     // topic has every published chunked message since topic is loaded
     public boolean msgChunkPublished;
 
@@ -688,12 +688,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
     private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
             MessageId startMessageId, long startMessageRollbackDurationSec) {
-        CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
         log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
 
         synchronized (ledger) {
             // Create a new non-durable cursor only for the first consumer that connects
-            Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
+            PersistentSubscription subscription = subscriptions.get(subscriptionName);
+
+            if (subscription == null) {
                 MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
                         : (MessageIdImpl) MessageId.latest;
 
@@ -702,7 +703,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 // Ensure that the start message id starts from a valid entry.
                 if (ledgerId >= 0 && entryId >= 0
                         && msgId instanceof BatchMessageIdImpl) {
-                    // When the start message is relative to a batch, we need to take one step back on the previous message,
+                    // When the start message is relative to a batch, we need to take one step back on the previous
+                    // message,
                     // because the "batch" might not have been consumed in its entirety.
                     // The client will then be able to discard the first messages if needed.
                     entryId = msgId.getEntryId() - 1;
@@ -713,32 +715,31 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 try {
                     cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
                 } catch (ManagedLedgerException e) {
-                    subscriptionFuture.completeExceptionally(e);
+                    return FutureUtil.failedFuture(e);
                 }
-            return new PersistentSubscription(this, subscriptionName, cursor, false);
-            });
 
-            if (!subscriptionFuture.isDone()) {
-                if (startMessageRollbackDurationSec > 0) {
-                    long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
-                    subscription.resetCursor(timestamp).handle((s, ex) -> {
-                        if (ex != null) {
-                            log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
-                                    startMessageRollbackDurationSec);
-                        }
-                        subscriptionFuture.complete(subscription);
-                        return null;
-                    });
-                } else {
-                    subscriptionFuture.complete(subscription);
-                }
+                subscription = new PersistentSubscription(this, subscriptionName, cursor, false);
+                subscriptions.put(subscriptionName, subscription);
+            }
+
+            if (startMessageRollbackDurationSec > 0) {
+                long timestamp = System.currentTimeMillis()
+                        - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
+                CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
+                final Subscription finalSubscription = subscription;
+                subscription.resetCursor(timestamp).handle((s, ex) -> {
+                    if (ex != null) {
+                        log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
+                                startMessageRollbackDurationSec);
+                    }
+                    subscriptionFuture.complete(finalSubscription);
+                    return null;
+                });
+                return subscriptionFuture;
             } else {
-                // failed to initialize managed-cursor: clean up created subscription
-                subscriptions.remove(subscriptionName);
+                return CompletableFuture.completedFuture(subscription);
             }
         }
-
-        return subscriptionFuture;
     }
 
     @Override
@@ -936,7 +937,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public CompletableFuture<Void> close() {
         return close(false);
     }
-    
+
     /**
      * Close this topic - close all producers and subscriptions associated with this topic
      *
@@ -967,7 +968,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
         producers.values().forEach(producer -> futures.add(producer.disconnect()));
         subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
-        
+
         CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
                 : FutureUtil.waitForAll(futures);
 
@@ -1794,7 +1795,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
 
         initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
-        
+
         this.updateMaxPublishRate(data);
 
         producers.values().forEach(producer -> {