You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/03 09:52:58 UTC

[pulsar] branch master updated: Only close active consumer for Failover subscription when seek(). (#7141)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dbe5903  Only close active consumer for Failover subscription when seek(). (#7141)
dbe5903 is described below

commit dbe5903a08fb64975ad0aece5fc857aa529ef1d1
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jun 3 17:52:38 2020 +0800

    Only close active consumer for Failover subscription when seek(). (#7141)
    
    Related to #5278
    
    ### Motivation
    
    Only close active consumer for Failover subscription when seek().
    
    ### Verifying this change
    
    Unit tests added
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): ( no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (no)
---
 .../ci-integration-backwards-compatibility.yaml    |  2 +-
 .github/workflows/ci-integration-cli.yaml          |  2 +-
 .../workflows/ci-integration-function-state.yaml   |  2 +-
 .github/workflows/ci-integration-messaging.yaml    |  2 +-
 .github/workflows/ci-integration-schema.yaml       |  2 +-
 .github/workflows/ci-integration-sql.yaml          |  2 +-
 .github/workflows/ci-integration-standalone.yaml   |  2 +-
 .../ci-integration-tiered-filesystem.yaml          |  2 +-
 .../workflows/ci-integration-tiered-jcloud.yaml    |  2 +-
 .../AbstractDispatcherSingleActiveConsumer.java    |  9 +++
 .../apache/pulsar/broker/service/Dispatcher.java   |  5 ++
 .../NonPersistentDispatcherMultipleConsumers.java  |  5 ++
 .../PersistentDispatcherMultipleConsumers.java     |  5 ++
 .../service/persistent/PersistentSubscription.java |  2 +-
 .../PrecisTopicPublishRateThrottleTest.java        |  6 +-
 .../broker/service/SubscriptionSeekTest.java       | 83 ++++++++++++++++++++++
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 12 ++--
 .../pulsar/io/batch/BatchSourceExecutor.java       |  2 +-
 18 files changed, 126 insertions(+), 21 deletions(-)

diff --git a/.github/workflows/ci-integration-backwards-compatibility.yaml b/.github/workflows/ci-integration-backwards-compatibility.yaml
index c9a5ced..58fba8c 100644
--- a/.github/workflows/ci-integration-backwards-compatibility.yaml
+++ b/.github/workflows/ci-integration-backwards-compatibility.yaml
@@ -53,7 +53,7 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           sudo swapoff -a
-          sudo rm -f /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/.github/workflows/ci-integration-cli.yaml b/.github/workflows/ci-integration-cli.yaml
index f74510b..4ec3991 100644
--- a/.github/workflows/ci-integration-cli.yaml
+++ b/.github/workflows/ci-integration-cli.yaml
@@ -53,7 +53,7 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           sudo swapoff -a
-          sudo rm -f /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/.github/workflows/ci-integration-function-state.yaml b/.github/workflows/ci-integration-function-state.yaml
index 1574f7e..beb4b7b 100644
--- a/.github/workflows/ci-integration-function-state.yaml
+++ b/.github/workflows/ci-integration-function-state.yaml
@@ -53,7 +53,7 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           sudo swapoff -a
-          sudo rm -f /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/.github/workflows/ci-integration-messaging.yaml b/.github/workflows/ci-integration-messaging.yaml
index 7db4a1b..907ba2d 100644
--- a/.github/workflows/ci-integration-messaging.yaml
+++ b/.github/workflows/ci-integration-messaging.yaml
@@ -53,7 +53,7 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           sudo swapoff -a
-          sudo rm -f /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/.github/workflows/ci-integration-schema.yaml b/.github/workflows/ci-integration-schema.yaml
index c21dfe7..f563b7a 100644
--- a/.github/workflows/ci-integration-schema.yaml
+++ b/.github/workflows/ci-integration-schema.yaml
@@ -53,7 +53,7 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           sudo swapoff -a
-          sudo rm -f /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/.github/workflows/ci-integration-sql.yaml b/.github/workflows/ci-integration-sql.yaml
index daefeda..5b4b071 100644
--- a/.github/workflows/ci-integration-sql.yaml
+++ b/.github/workflows/ci-integration-sql.yaml
@@ -53,7 +53,7 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           sudo swapoff -a
-          sudo rm -f /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/.github/workflows/ci-integration-standalone.yaml b/.github/workflows/ci-integration-standalone.yaml
index 2b6a231..e6c6a21 100644
--- a/.github/workflows/ci-integration-standalone.yaml
+++ b/.github/workflows/ci-integration-standalone.yaml
@@ -53,7 +53,7 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           sudo swapoff -a
-          sudo rm -f /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/.github/workflows/ci-integration-tiered-filesystem.yaml b/.github/workflows/ci-integration-tiered-filesystem.yaml
index 472cbce..95e3c4e 100644
--- a/.github/workflows/ci-integration-tiered-filesystem.yaml
+++ b/.github/workflows/ci-integration-tiered-filesystem.yaml
@@ -53,7 +53,7 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           sudo swapoff -a
-          sudo rm -f /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/.github/workflows/ci-integration-tiered-jcloud.yaml b/.github/workflows/ci-integration-tiered-jcloud.yaml
index 1594452..36042e3 100644
--- a/.github/workflows/ci-integration-tiered-jcloud.yaml
+++ b/.github/workflows/ci-integration-tiered-jcloud.yaml
@@ -53,7 +53,7 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           sudo swapoff -a
-          sudo rm -f /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index a093d07..6c5f8a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -245,6 +245,15 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
         return closeFuture;
     }
 
+    public synchronized CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
+        closeFuture = new CompletableFuture<>();
+        if (activeConsumer != null) {
+            activeConsumer.disconnect(isResetCursor);
+        }
+        closeFuture.complete(null);
+        return closeFuture;
+    }
+
     @Override
     public synchronized void resetCloseFuture() {
         closeFuture = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index cffa024..7b789e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -56,6 +56,11 @@ public interface Dispatcher {
     boolean isClosed();
 
     /**
+     * Disconnect active consumers
+     */
+    CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor);
+
+    /**
      * disconnect all consumers
      *
      * @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 2053f5c..9648e2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -175,6 +175,11 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
     }
 
     @Override
+    public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
+        return disconnectAllConsumers(isResetCursor);
+    }
+
+    @Override
     public synchronized void resetCloseFuture() {
         closeFuture = null;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 1402b40..c7af338 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -420,6 +420,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     @Override
+    public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
+        return disconnectAllConsumers(isResetCursor);
+    }
+
+    @Override
     public synchronized void resetCloseFuture() {
         closeFuture = null;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c207832..1ba4ce7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -698,7 +698,7 @@ public class PersistentSubscription implements Subscription {
         // Lock the Subscription object before locking the Dispatcher object to avoid deadlocks
         synchronized (this) {
             if (dispatcher != null && dispatcher.isConsumerConnected()) {
-                disconnectFuture = dispatcher.disconnectAllConsumers(true);
+                disconnectFuture = dispatcher.disconnectActiveConsumers(true);
             } else {
                 disconnectFuture = CompletableFuture.completedFuture(null);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
index c7a02aa..31130fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
@@ -45,7 +45,7 @@ public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{
         PublishRate publishRate = new PublishRate(1,10);
         // disable precis topic publish rate limiting
         conf.setPreciseTopicPublishRateLimiterEnable(false);
-        conf.setMaxPendingPublishdRequestsPerConnection(0);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
         super.baseSetup();
         final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
         org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
@@ -84,7 +84,7 @@ public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{
     public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Exception {
         PublishRate publishRate = new PublishRate(1,10);
         conf.setPreciseTopicPublishRateLimiterEnable(true);
-        conf.setMaxPendingPublishdRequestsPerConnection(0);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
         super.baseSetup();
         final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
         org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
@@ -116,7 +116,7 @@ public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{
     public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception {
         PublishRate publishRate = new PublishRate(1,10);
         conf.setPreciseTopicPublishRateLimiterEnable(true);
-        conf.setMaxPendingPublishdRequestsPerConnection(0);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
         super.baseSetup();
         final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
         org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index f1b58ff..54afc0e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -19,11 +19,15 @@
 package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -31,6 +35,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.testng.annotations.AfterClass;
@@ -199,4 +204,82 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(backlogs, 10);
     }
 
+    @Test
+    public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek";
+        // Disable pre-fetch in consumer to track the messages received
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getSubscriptions().size(), 1);
+        List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
+        assertEquals(consumers.size(), 2);
+        Set<String> connectedSinceSet = new HashSet<>();
+        for (Consumer consumer : consumers) {
+            connectedSinceSet.add(consumer.getStats().getConnectedSince());
+        }
+        assertEquals(connectedSinceSet.size(), 2);
+        consumer1.seek(MessageId.earliest);
+        // Wait for consumer to reconnect
+        Thread.sleep(1000);
+
+        consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
+        assertEquals(consumers.size(), 2);
+        for (Consumer consumer : consumers) {
+            assertFalse(connectedSinceSet.contains(consumer.getStats().getConnectedSince()));
+        }
+    }
+
+    @Test
+    public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek";
+        // Disable pre-fetch in consumer to track the messages received
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getSubscriptions().size(), 1);
+        List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
+        assertEquals(consumers.size(), 2);
+        Set<String> connectedSinceSet = new HashSet<>();
+        for (Consumer consumer : consumers) {
+            connectedSinceSet.add(consumer.getStats().getConnectedSince());
+        }
+        assertEquals(connectedSinceSet.size(), 2);
+        consumer1.seek(MessageId.earliest);
+        // Wait for consumer to reconnect
+        Thread.sleep(1000);
+
+        consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
+        assertEquals(consumers.size(), 2);
+
+        boolean hasConsumerNotDisconnected = false;
+        for (Consumer consumer : consumers) {
+            if (connectedSinceSet.contains(consumer.getStats().getConnectedSince())) {
+                hasConsumerNotDisconnected = true;
+            }
+        }
+        assertTrue(hasConsumerNotDisconnected);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 132d8b1..69b6a5e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -73,16 +73,14 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         TopicPolicies initPolicy = TopicPolicies.builder()
                 .maxConsumerPerTopic(10)
                 .build();
-        systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy);
-
-        Assert.assertNull(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
-
-        Thread.sleep(1000);
-
+        systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
         Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
 
+        // Wait for all topic policies updated.
+        Thread.sleep(3000);
+
         // Assert broker is cache all topic policies
-        Assert.assertEquals(10, systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue());
+        Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(), 10);
 
         // Update policy for TOPIC1
         TopicPolicies policies1 = TopicPolicies.builder()
diff --git a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java b/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
index 88a011e..e624db9 100644
--- a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
+++ b/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionCommon;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.io.core.*;