You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/03 09:52:58 UTC
[pulsar] branch master updated: Only close active consumer for
Failover subscription when seek(). (#7141)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dbe5903 Only close active consumer for Failover subscription when seek(). (#7141)
dbe5903 is described below
commit dbe5903a08fb64975ad0aece5fc857aa529ef1d1
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jun 3 17:52:38 2020 +0800
Only close active consumer for Failover subscription when seek(). (#7141)
Related to #5278
### Motivation
Only close active consumer for Failover subscription when seek().
### Verifying this change
Unit tests added
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): ( no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
### Documentation
- Does this pull request introduce a new feature? (no)
---
.../ci-integration-backwards-compatibility.yaml | 2 +-
.github/workflows/ci-integration-cli.yaml | 2 +-
.../workflows/ci-integration-function-state.yaml | 2 +-
.github/workflows/ci-integration-messaging.yaml | 2 +-
.github/workflows/ci-integration-schema.yaml | 2 +-
.github/workflows/ci-integration-sql.yaml | 2 +-
.github/workflows/ci-integration-standalone.yaml | 2 +-
.../ci-integration-tiered-filesystem.yaml | 2 +-
.../workflows/ci-integration-tiered-jcloud.yaml | 2 +-
.../AbstractDispatcherSingleActiveConsumer.java | 9 +++
.../apache/pulsar/broker/service/Dispatcher.java | 5 ++
.../NonPersistentDispatcherMultipleConsumers.java | 5 ++
.../PersistentDispatcherMultipleConsumers.java | 5 ++
.../service/persistent/PersistentSubscription.java | 2 +-
.../PrecisTopicPublishRateThrottleTest.java | 6 +-
.../broker/service/SubscriptionSeekTest.java | 83 ++++++++++++++++++++++
.../SystemTopicBasedTopicPoliciesServiceTest.java | 12 ++--
.../pulsar/io/batch/BatchSourceExecutor.java | 2 +-
18 files changed, 126 insertions(+), 21 deletions(-)
diff --git a/.github/workflows/ci-integration-backwards-compatibility.yaml b/.github/workflows/ci-integration-backwards-compatibility.yaml
index c9a5ced..58fba8c 100644
--- a/.github/workflows/ci-integration-backwards-compatibility.yaml
+++ b/.github/workflows/ci-integration-backwards-compatibility.yaml
@@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
- sudo rm -f /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
diff --git a/.github/workflows/ci-integration-cli.yaml b/.github/workflows/ci-integration-cli.yaml
index f74510b..4ec3991 100644
--- a/.github/workflows/ci-integration-cli.yaml
+++ b/.github/workflows/ci-integration-cli.yaml
@@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
- sudo rm -f /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
diff --git a/.github/workflows/ci-integration-function-state.yaml b/.github/workflows/ci-integration-function-state.yaml
index 1574f7e..beb4b7b 100644
--- a/.github/workflows/ci-integration-function-state.yaml
+++ b/.github/workflows/ci-integration-function-state.yaml
@@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
- sudo rm -f /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
diff --git a/.github/workflows/ci-integration-messaging.yaml b/.github/workflows/ci-integration-messaging.yaml
index 7db4a1b..907ba2d 100644
--- a/.github/workflows/ci-integration-messaging.yaml
+++ b/.github/workflows/ci-integration-messaging.yaml
@@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
- sudo rm -f /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
diff --git a/.github/workflows/ci-integration-schema.yaml b/.github/workflows/ci-integration-schema.yaml
index c21dfe7..f563b7a 100644
--- a/.github/workflows/ci-integration-schema.yaml
+++ b/.github/workflows/ci-integration-schema.yaml
@@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
- sudo rm -f /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
diff --git a/.github/workflows/ci-integration-sql.yaml b/.github/workflows/ci-integration-sql.yaml
index daefeda..5b4b071 100644
--- a/.github/workflows/ci-integration-sql.yaml
+++ b/.github/workflows/ci-integration-sql.yaml
@@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
- sudo rm -f /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
diff --git a/.github/workflows/ci-integration-standalone.yaml b/.github/workflows/ci-integration-standalone.yaml
index 2b6a231..e6c6a21 100644
--- a/.github/workflows/ci-integration-standalone.yaml
+++ b/.github/workflows/ci-integration-standalone.yaml
@@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
- sudo rm -f /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
diff --git a/.github/workflows/ci-integration-tiered-filesystem.yaml b/.github/workflows/ci-integration-tiered-filesystem.yaml
index 472cbce..95e3c4e 100644
--- a/.github/workflows/ci-integration-tiered-filesystem.yaml
+++ b/.github/workflows/ci-integration-tiered-filesystem.yaml
@@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
- sudo rm -f /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
diff --git a/.github/workflows/ci-integration-tiered-jcloud.yaml b/.github/workflows/ci-integration-tiered-jcloud.yaml
index 1594452..36042e3 100644
--- a/.github/workflows/ci-integration-tiered-jcloud.yaml
+++ b/.github/workflows/ci-integration-tiered-jcloud.yaml
@@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
- sudo rm -f /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index a093d07..6c5f8a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -245,6 +245,15 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
return closeFuture;
}
+ public synchronized CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
+ closeFuture = new CompletableFuture<>();
+ if (activeConsumer != null) {
+ activeConsumer.disconnect(isResetCursor);
+ }
+ closeFuture.complete(null);
+ return closeFuture;
+ }
+
@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index cffa024..7b789e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -56,6 +56,11 @@ public interface Dispatcher {
boolean isClosed();
/**
+ * Disconnect active consumers
+ */
+ CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor);
+
+ /**
* disconnect all consumers
*
* @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 2053f5c..9648e2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -175,6 +175,11 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
}
@Override
+ public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
+ return disconnectAllConsumers(isResetCursor);
+ }
+
+ @Override
public synchronized void resetCloseFuture() {
closeFuture = null;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 1402b40..c7af338 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -420,6 +420,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
@Override
+ public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
+ return disconnectAllConsumers(isResetCursor);
+ }
+
+ @Override
public synchronized void resetCloseFuture() {
closeFuture = null;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c207832..1ba4ce7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -698,7 +698,7 @@ public class PersistentSubscription implements Subscription {
// Lock the Subscription object before locking the Dispatcher object to avoid deadlocks
synchronized (this) {
if (dispatcher != null && dispatcher.isConsumerConnected()) {
- disconnectFuture = dispatcher.disconnectAllConsumers(true);
+ disconnectFuture = dispatcher.disconnectActiveConsumers(true);
} else {
disconnectFuture = CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
index c7a02aa..31130fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
@@ -45,7 +45,7 @@ public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{
PublishRate publishRate = new PublishRate(1,10);
// disable precis topic publish rate limiting
conf.setPreciseTopicPublishRateLimiterEnable(false);
- conf.setMaxPendingPublishdRequestsPerConnection(0);
+ conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
@@ -84,7 +84,7 @@ public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{
public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
conf.setPreciseTopicPublishRateLimiterEnable(true);
- conf.setMaxPendingPublishdRequestsPerConnection(0);
+ conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
@@ -116,7 +116,7 @@ public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{
public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
conf.setPreciseTopicPublishRateLimiterEnable(true);
- conf.setMaxPendingPublishdRequestsPerConnection(0);
+ conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index f1b58ff..54afc0e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -19,11 +19,15 @@
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -31,6 +35,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.testng.annotations.AfterClass;
@@ -199,4 +204,82 @@ public class SubscriptionSeekTest extends BrokerTestBase {
assertEquals(backlogs, 10);
}
+ @Test
+ public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception {
+ final String topicName = "persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek";
+ // Disable pre-fetch in consumer to track the messages received
+ org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("my-subscription")
+ .subscribe();
+
+ pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("my-subscription")
+ .subscribe();
+
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ assertNotNull(topicRef);
+ assertEquals(topicRef.getSubscriptions().size(), 1);
+ List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
+ assertEquals(consumers.size(), 2);
+ Set<String> connectedSinceSet = new HashSet<>();
+ for (Consumer consumer : consumers) {
+ connectedSinceSet.add(consumer.getStats().getConnectedSince());
+ }
+ assertEquals(connectedSinceSet.size(), 2);
+ consumer1.seek(MessageId.earliest);
+ // Wait for consumer to reconnect
+ Thread.sleep(1000);
+
+ consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
+ assertEquals(consumers.size(), 2);
+ for (Consumer consumer : consumers) {
+ assertFalse(connectedSinceSet.contains(consumer.getStats().getConnectedSince()));
+ }
+ }
+
+ @Test
+ public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception {
+ final String topicName = "persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek";
+ // Disable pre-fetch in consumer to track the messages received
+ org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscriptionName("my-subscription")
+ .subscribe();
+
+ pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscriptionName("my-subscription")
+ .subscribe();
+
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ assertNotNull(topicRef);
+ assertEquals(topicRef.getSubscriptions().size(), 1);
+ List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
+ assertEquals(consumers.size(), 2);
+ Set<String> connectedSinceSet = new HashSet<>();
+ for (Consumer consumer : consumers) {
+ connectedSinceSet.add(consumer.getStats().getConnectedSince());
+ }
+ assertEquals(connectedSinceSet.size(), 2);
+ consumer1.seek(MessageId.earliest);
+ // Wait for consumer to reconnect
+ Thread.sleep(1000);
+
+ consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
+ assertEquals(consumers.size(), 2);
+
+ boolean hasConsumerNotDisconnected = false;
+ for (Consumer consumer : consumers) {
+ if (connectedSinceSet.contains(consumer.getStats().getConnectedSince())) {
+ hasConsumerNotDisconnected = true;
+ }
+ }
+ assertTrue(hasConsumerNotDisconnected);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 132d8b1..69b6a5e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -73,16 +73,14 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
TopicPolicies initPolicy = TopicPolicies.builder()
.maxConsumerPerTopic(10)
.build();
- systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy);
-
- Assert.assertNull(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
-
- Thread.sleep(1000);
-
+ systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
+ // Wait for all topic policies updated.
+ Thread.sleep(3000);
+
// Assert broker is cache all topic policies
- Assert.assertEquals(10, systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue());
+ Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(), 10);
// Update policy for TOPIC1
TopicPolicies policies1 = TopicPolicies.builder()
diff --git a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java b/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
index 88a011e..e624db9 100644
--- a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
+++ b/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.io.core.*;