You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:17 UTC

[pulsar] 02/26: [issue #3975] Bugfix NPE on non durable consumer (#3988)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 503114219807d2a887d73114e2b76fa82fbdf2ee
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Mon Apr 8 23:01:19 2019 -0300

    [issue #3975] Bugfix NPE on non durable consumer (#3988)
    
    *Motivation*
    
    Trying to fix #3975
    
    When a reset of a cursor is performed with some timestamp on a non-durable
    consumer the message finder will fail with null pointer exception due to
    `cursor.getName()` being null.
    
    *Modifications*
    
      - Add method overloading for `newNonDurableCursor()` with subscription name.
      - Fix method getNonDurableSubscription to call `newNonDurableCursor()` with
        proper subscription name
      - Add test to assert issue.
---
 .../main/java/org/apache/bookkeeper/mledger/ManagedLedger.java |  1 +
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java  | 10 ++++++++++
 .../pulsar/broker/service/persistent/PersistentTopic.java      |  2 +-
 .../java/org/apache/pulsar/client/api/TopicReaderTest.java     |  4 +++-
 4 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index d6813e4..fd2f4bb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -172,6 +172,7 @@ public interface ManagedLedger {
      * @return the new NonDurableCursor
      */
     ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;
+    ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException;
 
     /**
      * Delete a ManagedCursor asynchronously.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7b20995..efa3e08 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -832,6 +832,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     @Override
+    public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName)
+            throws ManagedLedgerException {
+        checkManagedLedgerIsOpen();
+        checkFenced();
+
+        return new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
+                (PositionImpl) startCursorPosition);
+    }
+
+    @Override
     public Iterable<ManagedCursor> getCursors() {
         return cursors;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e818bf6..667bfc0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -641,7 +641,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
             Position startPosition = new PositionImpl(ledgerId, entryId);
             ManagedCursor cursor = null;
             try {
-                cursor = ledger.newNonDurableCursor(startPosition);
+                cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
             } catch (ManagedLedgerException e) {
                 subscriptionFuture.completeExceptionally(e);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 6318974..a4cdcd9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -34,7 +34,9 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -479,7 +481,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
             assertTrue(reader.hasMessageAvailable());
 
             String readOut = new String(reader.readNext().getData());
-            assertTrue(readOut.equals(content));
+            assertEquals(content, readOut);
             assertFalse(reader.hasMessageAvailable());
         }