You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:17 UTC
[pulsar] 02/26: [issue #3975] Bugfix NPE on non durable consumer
(#3988)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 503114219807d2a887d73114e2b76fa82fbdf2ee
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Mon Apr 8 23:01:19 2019 -0300
[issue #3975] Bugfix NPE on non durable consumer (#3988)
*Motivation*
Trying to fix #3975
When a reset of a cursor is performed with some timestamp on a non-durable
consumer the message finder will fail with null pointer exception due to
`cursor.getName()` being null.
*Modifications*
- Add method overloading for `newNonDurableCursor()` with subscription name.
- Fix method getNonDurableSubscription to call `newNonDurableCursor()` with
proper subscription name
- Add test to assert issue.
---
.../main/java/org/apache/bookkeeper/mledger/ManagedLedger.java | 1 +
.../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 ++++++++++
.../pulsar/broker/service/persistent/PersistentTopic.java | 2 +-
.../java/org/apache/pulsar/client/api/TopicReaderTest.java | 4 +++-
4 files changed, 15 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index d6813e4..fd2f4bb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -172,6 +172,7 @@ public interface ManagedLedger {
* @return the new NonDurableCursor
*/
ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;
+ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException;
/**
* Delete a ManagedCursor asynchronously.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7b20995..efa3e08 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -832,6 +832,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
@Override
+ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName)
+ throws ManagedLedgerException {
+ checkManagedLedgerIsOpen();
+ checkFenced();
+
+ return new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
+ (PositionImpl) startCursorPosition);
+ }
+
+ @Override
public Iterable<ManagedCursor> getCursors() {
return cursors;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e818bf6..667bfc0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -641,7 +641,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
- cursor = ledger.newNonDurableCursor(startPosition);
+ cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
} catch (ManagedLedgerException e) {
subscriptionFuture.completeExceptionally(e);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 6318974..a4cdcd9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -34,7 +34,9 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -479,7 +481,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
assertTrue(reader.hasMessageAvailable());
String readOut = new String(reader.readNext().getData());
- assertTrue(readOut.equals(content));
+ assertEquals(content, readOut);
assertFalse(reader.hasMessageAvailable());
}