You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/11/28 22:12:58 UTC
[pulsar] branch master updated: Remove cursor while remove
non-durable subscription (#5719)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2e30c08 Remove cursor while remove non-durable subscription (#5719)
2e30c08 is described below
commit 2e30c086b2461531c62164d09ea148928a0e3ae6
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Nov 29 06:12:49 2019 +0800
Remove cursor while remove non-durable subscription (#5719)
### Motivation
Remove cursor from cursors of managed ledger while remove non-durable subscription. The data deletion is depends the mark delete position of all cursors, if left a unused cursor in the cursors of managed ledger, data can't be delete as expected.
### Modifications
Remove cursor while remove non-durable subscription
---
.../AbstractDispatcherSingleActiveConsumer.java | 4 +-
.../org/apache/pulsar/broker/service/Consumer.java | 12 +++++-
.../apache/pulsar/broker/service/Dispatcher.java | 6 ++-
.../apache/pulsar/broker/service/Subscription.java | 6 ++-
.../nonpersistent/NonPersistentDispatcher.java | 4 +-
.../NonPersistentDispatcherMultipleConsumers.java | 2 +-
.../nonpersistent/NonPersistentSubscription.java | 2 +-
.../PersistentDispatcherMultipleConsumers.java | 4 +-
.../service/persistent/PersistentSubscription.java | 14 ++++++-
.../org/apache/pulsar/client/impl/ReaderTest.java | 44 +++++++++++++++++++---
10 files changed, 79 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 02fcea1..509c3e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -219,11 +219,11 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
*
* @return
*/
- public synchronized CompletableFuture<Void> disconnectAllConsumers() {
+ public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
closeFuture = new CompletableFuture<>();
if (!consumers.isEmpty()) {
- consumers.forEach(Consumer::disconnect);
+ consumers.forEach(consumer -> consumer.disconnect(isResetCursor));
cancelPendingRead();
} else {
// no consumer connected, complete disconnect immediately
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index fa45eb4..8c0e1d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -293,15 +293,23 @@ public class Consumer {
* pending message acks
*/
public void close() throws BrokerServiceException {
- subscription.removeConsumer(this);
+ close(false);
+ }
+
+ public void close(boolean isResetCursor) throws BrokerServiceException {
+ subscription.removeConsumer(this, isResetCursor);
cnx.removedConsumer(this);
}
public void disconnect() {
+ disconnect(false);
+ }
+
+ public void disconnect(boolean isResetCursor) {
log.info("Disconnecting consumer: {}", this);
cnx.closeConsumer(this);
try {
- close();
+ close(isResetCursor);
} catch (BrokerServiceException e) {
log.warn("Consumer {} was already closed: {}", this, e.getMessage(), e);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 24118ae..cffa024 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -60,7 +60,11 @@ public interface Dispatcher {
*
* @return
*/
- CompletableFuture<Void> disconnectAllConsumers();
+ CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor);
+
+ default CompletableFuture<Void> disconnectAllConsumers() {
+ return disconnectAllConsumers(false);
+ }
void resetCloseFuture();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index c044fa5..18c7c49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -37,7 +37,11 @@ public interface Subscription {
void addConsumer(Consumer consumer) throws BrokerServiceException;
- void removeConsumer(Consumer consumer) throws BrokerServiceException;
+ default void removeConsumer(Consumer consumer) throws BrokerServiceException {
+ removeConsumer(consumer, false);
+ }
+
+ void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException;
void consumerFlow(Consumer consumer, int additionalNumberOfMessages);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
index 9d5694d..fc8fdbc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
@@ -30,7 +30,7 @@ import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-public interface NonPersistentDispatcher extends Dispatcher{
+public interface NonPersistentDispatcher extends Dispatcher {
void addConsumer(Consumer consumer) throws BrokerServiceException;
@@ -44,7 +44,7 @@ public interface NonPersistentDispatcher extends Dispatcher{
CompletableFuture<Void> close() ;
- CompletableFuture<Void> disconnectAllConsumers();
+ CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor);
void reset();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 5d569a1..5375f39 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -164,7 +164,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
}
@Override
- public synchronized CompletableFuture<Void> disconnectAllConsumers() {
+ public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
closeFuture = new CompletableFuture<>();
if (consumerList.isEmpty()) {
closeFuture.complete(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index e8babda..6520420 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -165,7 +165,7 @@ public class NonPersistentSubscription implements Subscription {
}
@Override
- public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index e5a8b64..5d536c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -390,12 +390,12 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
@Override
- public synchronized CompletableFuture<Void> disconnectAllConsumers() {
+ public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
closeFuture = new CompletableFuture<>();
if (consumerList.isEmpty()) {
closeFuture.complete(null);
} else {
- consumerList.forEach(Consumer::disconnect);
+ consumerList.forEach(consumer -> consumer.disconnect(isResetCursor));
if (havePendingRead && cursor.cancelPendingReadRequest()) {
havePendingRead = false;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c3bdd24..200d06e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -44,6 +44,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
@@ -256,7 +257,7 @@ public class PersistentSubscription implements Subscription {
}
@Override
- public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
cursor.updateLastActive();
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
@@ -286,6 +287,15 @@ public class PersistentSubscription implements Subscription {
// topic.remove again try to access same map which creates deadlock. so, execute it in different thread.
topic.getBrokerService().pulsar().getExecutor().submit(() ->{
topic.removeSubscription(subName);
+ // Also need remove the cursor here, otherwise the data deletion will not work well.
+ // Because data deletion depends on the mark delete position of all cursors.
+ if (!isResetCursor) {
+ try {
+ topic.getManagedLedger().deleteCursor(cursor.getName());
+ } catch (InterruptedException | ManagedLedgerException e) {
+ log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e);
+ }
+ }
});
}
}
@@ -678,7 +688,7 @@ public class PersistentSubscription implements Subscription {
// Lock the Subscription object before locking the Dispatcher object to avoid deadlocks
synchronized (this) {
if (dispatcher != null && dispatcher.isConsumerConnected()) {
- disconnectFuture = dispatcher.disconnectAllConsumers();
+ disconnectFuture = dispatcher.disconnectAllConsumers(true);
} else {
disconnectFuture = CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 3eece11..b682c60 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -21,18 +21,15 @@ package org.apache.pulsar.client.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
+import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -45,6 +42,7 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -252,4 +250,40 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
assertFalse(reader.hasMessageAvailable());
}
+
+ /**
+ * We need to ensure that delete subscription of read also need to delete the
+ * non-durable cursor, because data deletion depends on the mark delete position of all cursors.
+ */
+ @Test
+ public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException, PulsarAdminException {
+
+ final String topic = "persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor";
+
+ @Cleanup
+ Reader<byte[]> reader1 = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ @Cleanup
+ Reader<byte[]> reader2 = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ Assert.assertEquals(admin.topics().getStats(topic).subscriptions.size(), 2);
+ Assert.assertEquals(admin.topics().getInternalStats(topic).cursors.size(), 2);
+
+ reader1.close();
+
+ Assert.assertEquals(admin.topics().getStats(topic).subscriptions.size(), 1);
+ Assert.assertEquals(admin.topics().getInternalStats(topic).cursors.size(), 1);
+
+ reader2.close();
+
+ Assert.assertEquals(admin.topics().getStats(topic).subscriptions.size(), 0);
+ Assert.assertEquals(admin.topics().getInternalStats(topic).cursors.size(), 0);
+
+ }
}