You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/24 23:59:20 UTC

[GitHub] [pulsar] merlimat opened a new pull request #7355: Handling error in creation of non-durable cursor

merlimat opened a new pull request #7355:
URL: https://github.com/apache/pulsar/pull/7355


   ### Motivation
   
   We're getting an NPE when the creation of a non-durable cursor fails.
   
   ```
   2020-06-20 00:00:23.872000+00:00 [WARN ] [he.pulsar.broker.service.ServerCnx]  [/127.0.0.1:47412][persistent://TOPIC][reader-7369753eb6] Failed to create consumer: null
           java.util.concurrent.CompletionException: java.lang.NullPointerException
           at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) [?:?]
           at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1113) [?:?]
           at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) [?:?]
           at org.apache.pulsar.broker.service.ServerCnx.lambda$null$14(ServerCnx.java:802) [org.apache.pulsar-pulsar-broker-2.5.0.SPLK.6584581.jar:2.5.0.SPLK.6584581]
           at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) [?:?]
           at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
           at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
           at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) [?:?]
           at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
           at java.lang.Thread.run(Thread.java:834) [?:?]
   Caused by: java.lang.NullPointerException
           at org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor.<init>(PersistentMessageExpiryMonitor.java:56) ~[org.apache.pulsar-pulsar-broker-2.5.0.SPLK.6584581.jar:2.5.0.SPLK.6584581]
           at org.apache.pulsar.broker.service.persistent.PersistentSubscription.<init>(PersistentSubscription.java:148) ~[org.apache.pulsar-pulsar-broker-2.5.0.SPLK.6584581.jar:2.5.0.SPLK.6584581]
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:695) ~[org.apache.pulsar-pulsar-broker-2.5.0.SPLK.6584581.jar:2.5.0.SPLK.6584581]
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274) ~[org.apache.pulsar-pulsar-common-2.5.0.SPLK.6584581.jar:2.5.0.SPLK.6584581]
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) ~[org.apache.pulsar-pulsar-common-2.5.0.SPLK.6584581.jar:2.5.0.SPLK.6584581]
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:673) ~[org.apache.pulsar-pulsar-broker-2.5.0.SPLK.6584581.jar:2.5.0.SPLK.6584581]
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:581) ~[org.apache.pulsar-pulsar-broker-2.5.0.SPLK.6584581.jar:2.5.0.SPLK.6584581]
           at org.apache.pulsar.broker.service.ServerCnx.lambda$null$11(ServerCnx.java:817) [org.apache.pulsar-pulsar-broker-2.5.0.SPLK.6584581.jar:2.5.0.SPLK.6584581]
           at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1106) ~[?:?]
           ... 8 more
   ```
   
   The reason is that, we fail the future but we go on in creating the subscription instance:
   
   ```java
   try {
       cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
   } catch (ManagedLedgerException e) {
       subscriptionFuture.completeExceptionally(e);
   }
   
   return new PersistentSubscription(this, subscriptionName, cursor, false);
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #7355: Handling error in creation of non-durable cursor

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #7355:
URL: https://github.com/apache/pulsar/pull/7355


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #7355: Handling error in creation of non-durable cursor

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #7355:
URL: https://github.com/apache/pulsar/pull/7355#discussion_r639953231



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -688,12 +688,13 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
 
     private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
             MessageId startMessageId, long startMessageRollbackDurationSec) {
-        CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
         log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
 
         synchronized (ledger) {
             // Create a new non-durable cursor only for the first consumer that connects
-            Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {

Review comment:
       @merlimat - I know this has been merged for a while, but I was reading through this class today, and I am concerned about a possible race condition that would lead to an incorrect update to the `subscriptions` map.
   
   With this PR, the `subscriptions` map is no longer updated using the `computeIfAbsent` method here, but is instead modified with a `get` and then a subsequent `put`. The `getDurableSubscription` updates the `subscriptions` map concurrently as well. I believe this opens us up to a race under the following conditions:
   
   1. We call get for a subscription on line 695, and get `null`.
   2. Then, a durable subscription with the same name is added to the `subscriptions` map on line 665.
   3. Then, the non durable subscription is put into the `subscriptions` map on new line 722, and in doing so, overwrites the durable subscription of the same name.
   
   Do you agree this is a risk here? If so, I will work on contributing a fix. I wanted to check first though, just in case there is context I'm missing. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org