You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/18 07:19:03 UTC

[pulsar] branch branch-2.8 updated (28a223c -> 4cc0821)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 28a223c  Improved logic for pausing replicated subscription snapshots when no traffic (#11922)
     new 0c29a88  [C++] Make some clean up methods thread safe (#11762)
     new 0b9d51b  [clean up] Remove unused variable and unneccessary box in NamespaceBundleFactory (#11975)
     new dbddfe5  [pulsar-functions] Pass `SubscriptionPosition` from `FunctionDetails` to `FunctionConfig` / `SinkConfig` (#11831)
     new 4cc0821  Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...istentStickyKeyDispatcherMultipleConsumers.java | 43 +++++++++++++
 .../nonpersistent/NonPersistentSubscription.java   | 35 +++--------
 ...istentStickyKeyDispatcherMultipleConsumers.java | 10 ++-
 .../service/persistent/PersistentSubscription.java |  8 ++-
 .../common/naming/NamespaceBundleFactory.java      |  5 +-
 .../client/api/KeySharedSubscriptionTest.java      | 71 ++++++++++++++++++++++
 .../pulsar/common/functions/FunctionConfig.java    |  3 +-
 .../org/apache/pulsar/common/io/SinkConfig.java    |  3 +-
 pulsar-client-cpp/lib/ClientImpl.cc                | 13 +++-
 pulsar-client-cpp/lib/ConnectionPool.cc            | 14 ++++-
 pulsar-client-cpp/lib/ConnectionPool.h             |  9 ++-
 pulsar-client-cpp/lib/ExecutorService.cc           |  7 +++
 pulsar-client-cpp/lib/ExecutorService.h            |  3 +
 pulsar-client-cpp/lib/Producer.cc                  |  1 -
 pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc     | 10 +++
 pulsar-client-cpp/tests/BinaryLookupServiceTest.cc |  2 -
 pulsar-client-cpp/tests/CustomLoggerTest.cc        |  4 +-
 pulsar-client-cpp/tests/MessageTest.cc             |  2 -
 pulsar-client-cpp/tests/ReaderConfigurationTest.cc |  3 -
 .../functions/instance/JavaInstanceRunnable.java   | 12 ++--
 .../pulsar/functions/utils/FunctionCommon.java     | 10 +++
 .../functions/utils/FunctionConfigUtils.java       | 21 ++++---
 .../pulsar/functions/utils/SinkConfigUtils.java    |  6 ++
 23 files changed, 231 insertions(+), 64 deletions(-)

[pulsar] 03/04: [pulsar-functions] Pass `SubscriptionPosition` from `FunctionDetails` to `FunctionConfig` / `SinkConfig` (#11831)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dbddfe5282fa5c0d0ff5ec85db624672e4c9a603
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Wed Sep 15 08:53:56 2021 +0800

    [pulsar-functions] Pass `SubscriptionPosition` from `FunctionDetails` to `FunctionConfig` / `SinkConfig` (#11831)
    
    * pass SubscriptionPosition from FunctionDetails to config
    
    * address comment
    
    * reduce code duplication
    
    * set subscriptionPosition in FunctionConfig, SinkConfig with init value
    
    * fix default values
    
    * fix CI
    
    * revert init data
    
    * fix CI
    
    * fix CI
    
    * fix CI
    
    (cherry picked from commit bfd6542ac17c24ff38134a843b3f372cad9246e8)
---
 .../pulsar/common/functions/FunctionConfig.java     |  3 ++-
 .../org/apache/pulsar/common/io/SinkConfig.java     |  3 ++-
 .../functions/instance/JavaInstanceRunnable.java    | 12 ++++--------
 .../pulsar/functions/utils/FunctionCommon.java      | 10 ++++++++++
 .../pulsar/functions/utils/FunctionConfigUtils.java | 21 ++++++++++++++-------
 .../pulsar/functions/utils/SinkConfigUtils.java     |  6 ++++++
 6 files changed, 38 insertions(+), 17 deletions(-)

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 527f9b8..0fe539f 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -126,5 +126,6 @@ public class FunctionConfig {
     // Whether the pulsar admin client exposed to function context, default is disabled.
     private Boolean exposePulsarAdminClientEnabled;
 
-    private SubscriptionInitialPosition subscriptionPosition;
+    @Builder.Default
+    private SubscriptionInitialPosition subscriptionPosition = SubscriptionInitialPosition.Latest;
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index 4a0d093..9bf171a 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -43,7 +43,8 @@ public class SinkConfig {
     private String name;
     private String className;
     private String sourceSubscriptionName;
-    private SubscriptionInitialPosition sourceSubscriptionPosition;
+    @Builder.Default
+    private SubscriptionInitialPosition sourceSubscriptionPosition = SubscriptionInitialPosition.Latest;
 
     private Collection<String> inputs;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 692902e..f6ca84d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.functions.instance;
 
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Preconditions;
 
@@ -669,14 +670,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     FunctionConfig.ProcessingGuarantees.valueOf(
                             this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
 
-            switch (sourceSpec.getSubscriptionPosition()) {
-                case EARLIEST:
-                    pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
-                    break;
-                default:
-                    pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
-                    break;
-            }
+            pulsarSourceConfig.setSubscriptionPosition(
+                    convertFromFunctionDetailsSubscriptionPosition(sourceSpec.getSubscriptionPosition())
+            );
 
             Preconditions.checkNotNull(contextImpl.getSubscriptionType());
             pulsarSourceConfig.setSubscriptionType(contextImpl.getSubscriptionType());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 2e433ad..a13695e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -46,6 +46,7 @@ import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -501,4 +502,13 @@ public class FunctionCommon {
 
         return false;
     }
+
+    public static SubscriptionInitialPosition convertFromFunctionDetailsSubscriptionPosition(
+            org.apache.pulsar.functions.proto.Function.SubscriptionPosition subscriptionPosition) {
+        if (org.apache.pulsar.functions.proto.Function.SubscriptionPosition.EARLIEST.equals(subscriptionPosition)) {
+            return SubscriptionInitialPosition.Earliest;
+        } else {
+            return SubscriptionInitialPosition.Latest;
+        }
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 13089e4..a2579b7 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -47,6 +47,7 @@ import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.pulsar.common.functions.Utils.BUILTIN;
 import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
 
 @Slf4j
 public class FunctionConfigUtils {
@@ -158,15 +159,16 @@ public class FunctionConfigUtils {
         }
 
         // Set subscription position
-        Function.SubscriptionPosition subPosition;
-        if (functionConfig.getSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
-            subPosition = Function.SubscriptionPosition.EARLIEST;
-        } else {
-            subPosition = Function.SubscriptionPosition.LATEST;
+        if (functionConfig.getSubscriptionPosition() != null) {
+            Function.SubscriptionPosition subPosition = null;
+            if (SubscriptionInitialPosition.Earliest == functionConfig.getSubscriptionPosition()) {
+                subPosition = Function.SubscriptionPosition.EARLIEST;
+            } else {
+                subPosition = Function.SubscriptionPosition.LATEST;
+            }
+            sourceSpecBuilder.setSubscriptionPosition(subPosition);
         }
 
-        sourceSpecBuilder.setSubscriptionPosition(subPosition);
-
         if (typeArgs != null) {
             sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
         }
@@ -367,6 +369,11 @@ public class FunctionConfigUtils {
 
         functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
         functionConfig.setAutoAck(functionDetails.getAutoAck());
+
+        // Set subscription position
+        functionConfig.setSubscriptionPosition(
+                convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
+
         if (functionDetails.getSource().getTimeoutMs() != 0) {
             functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
         }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index fd6f35c..a8180ab 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -55,6 +55,7 @@ import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
 import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
 
@@ -284,6 +285,11 @@ public class SinkConfigUtils {
             sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         }
         sinkConfig.setAutoAck(functionDetails.getAutoAck());
+
+        // Set subscription position
+        sinkConfig.setSourceSubscriptionPosition(
+                convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
+
         if (functionDetails.getSource().getTimeoutMs() != 0) {
             sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
         }

[pulsar] 04/04: Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4cc082170cb7346acf808a4842a2634339c3853d
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Sep 17 14:35:02 2021 +0800

    Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)
    
    We will encounter the issue after all the previous consumers disconnected and the new consumers connect
    to the topic with different key_shared policy.
    
    The root cause is we are using the previous dispatcher after the key_shared policy changed, so the fix
    is to use a new dispatcher after a new consumer with a different key-shared policy
    
    (cherry picked from commit 3a4755f50ef46c3d94ce9629478941d5224cb800)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 43 +++++++++++++
 .../nonpersistent/NonPersistentSubscription.java   | 35 +++--------
 ...istentStickyKeyDispatcherMultipleConsumers.java | 10 ++-
 .../service/persistent/PersistentSubscription.java |  8 ++-
 .../client/api/KeySharedSubscriptionTest.java      | 71 ++++++++++++++++++++++
 5 files changed, 136 insertions(+), 31 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 704fd93..878bac8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,28 +18,67 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.protocol.Commands;
 
 public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
 
     private final StickyKeyConsumerSelector selector;
+    private final KeySharedMode keySharedMode;
 
     public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
+                                                             KeySharedMeta ksm) {
+        super(topic, subscription);
+        this.keySharedMode = ksm.getKeySharedMode();
+        switch (this.keySharedMode) {
+            case STICKY:
+                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
+                break;
+
+            case AUTO_SPLIT:
+            default:
+                ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
+                if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
+                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(
+                            conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
+                } else {
+                    this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
+                }
+                break;
+        }
+    }
+
+    @VisibleForTesting
+    NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
                                                              StickyKeyConsumerSelector selector) {
         super(topic, subscription);
+        if (selector instanceof HashRangeExclusiveStickyKeyConsumerSelector) {
+            keySharedMode = KeySharedMode.STICKY;
+        } else if (selector instanceof ConsistentHashingStickyKeyConsumerSelector
+                || selector instanceof HashRangeAutoSplitStickyKeyConsumerSelector) {
+            keySharedMode = KeySharedMode.AUTO_SPLIT;
+        } else {
+            keySharedMode = null;
+        }
         this.selector = selector;
     }
 
@@ -121,4 +160,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             }
         }
     }
+
+    public KeySharedMode getKeySharedMode() {
+        return keySharedMode;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index a400fae..a392e41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -28,22 +28,19 @@ import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
-import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
@@ -143,29 +140,13 @@ public class NonPersistentSubscription implements Subscription {
                 }
                 break;
             case Key_Shared:
-                if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+                KeySharedMeta ksm = consumer.getKeySharedMeta();
+                KeySharedMode keySharedMode = ksm.getKeySharedMode();
+                if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+                        || ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+                        != keySharedMode) {
                     previousDispatcher = dispatcher;
-
-                    switch (consumer.getKeySharedMeta().getKeySharedMode()) {
-                        case STICKY:
-                            dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
-                                    new HashRangeExclusiveStickyKeyConsumerSelector());
-                            break;
-
-                        case AUTO_SPLIT:
-                        default:
-                            StickyKeyConsumerSelector selector;
-                            ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
-                            if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
-                                selector = new ConsistentHashingStickyKeyConsumerSelector(
-                                        conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
-                            } else {
-                                selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
-                            }
-
-                            dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, selector);
-                            break;
-                    }
+                    this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
                 }
                 break;
             default:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d4d64e2..e62f63e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +58,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     private final StickyKeyConsumerSelector selector;
 
     private boolean isDispatcherStuckOnReplays = false;
+    private final KeySharedMode keySharedMode;
 
     /**
      * When a consumer joins, it will be added to this map with the current read position.
@@ -76,8 +78,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
         this.stuckConsumers = new HashSet<>();
         this.nextStuckConsumers = new HashSet<>();
-
-        switch (ksm.getKeySharedMode()) {
+        this.keySharedMode = ksm.getKeySharedMode();
+        switch (this.keySharedMode) {
         case AUTO_SPLIT:
             if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
                 selector = new ConsistentHashingStickyKeyConsumerSelector(
@@ -408,6 +410,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
     }
 
+    public KeySharedMode getKeySharedMode() {
+        return this.keySharedMode;
+    }
+
     public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
         return recentlyJoinedConsumers;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index cf01ace..1eb99e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.api.proto.TxnAction;
@@ -256,9 +257,12 @@ public class PersistentSubscription implements Subscription {
                             }
                             break;
                         case Key_Shared:
-                            if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+                            KeySharedMeta ksm = consumer.getKeySharedMeta();
+                            KeySharedMode keySharedMode = ksm.getKeySharedMode();
+                            if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+                                    || ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+                                    != keySharedMode) {
                                 previousDispatcher = dispatcher;
-                                KeySharedMeta ksm = consumer.getKeySharedMeta();
                                 dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
                                         topic.getBrokerService().getPulsar().getConfiguration(), ksm);
                             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index e785ac1..bc19852 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -39,6 +40,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -47,8 +49,12 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
@@ -93,6 +99,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         };
     }
 
+    @DataProvider(name = "topicDomain")
+    public Object[][] topicDomainProvider() {
+        return new Object[][] {
+                { "persistent" },
+                { "non-persistent" }
+        };
+    }
+
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
@@ -1012,6 +1026,63 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         });
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSelectorChangedAfterAllConsumerDisconnected(String topicDomain) throws PulsarClientException,
+            ExecutionException, InterruptedException {
+        final String topicName = TopicName.get(topicDomain, "public", "default",
+                "testSelectorChangedAfterAllConsumerDisconnected" + UUID.randomUUID()).toString();
+
+        final String subName = "my-sub";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .consumerName("first-consumer")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+                .cryptoKeyReader(new EncKeyReader())
+                .subscribe();
+
+        CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topicName);
+        assertTrue(future.isDone());
+        assertTrue(future.get().isPresent());
+        Topic topic = future.get().get();
+        KeySharedMode keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+        assertNotNull(keySharedMode);
+        assertEquals(keySharedMode, KeySharedMode.AUTO_SPLIT);
+
+        consumer1.close();
+
+        consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .consumerName("second-consumer")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65535)))
+                .cryptoKeyReader(new EncKeyReader())
+                .subscribe();
+
+        future = pulsar.getBrokerService().getTopicIfExists(topicName);
+        assertTrue(future.isDone());
+        assertTrue(future.get().isPresent());
+        topic = future.get().get();
+        keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+        assertNotNull(keySharedMode);
+        assertEquals(keySharedMode, KeySharedMode.STICKY);
+        consumer1.close();
+    }
+
+    private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) {
+        if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
+            return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+                    .getDispatcher()).getKeySharedMode();
+        } else if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.non_persistent)) {
+            return ((NonPersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+                    .getDispatcher()).getKeySharedMode();
+        }
+        return null;
+    }
+
     private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
         return pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)

[pulsar] 02/04: [clean up] Remove unused variable and unneccessary box in NamespaceBundleFactory (#11975)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0b9d51bb49a9fb7a94ba5d38b52dc7ea287f7731
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Fri Sep 10 11:56:35 2021 +0800

    [clean up] Remove unused variable and unneccessary box in NamespaceBundleFactory (#11975)
    
    Clean up. Remove unused variable and unneccessary box in NamespaceBundleFactory
    
    (cherry picked from commit 7ff9c6e19e1bfcc6abd94f258d99e463c3c5f0df)
---
 .../java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index a62ea93..3e30148 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -92,7 +92,6 @@ public class NamespaceBundleFactory {
         CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
         // Read the static bundle data from the policies
         pulsar.getLocalMetadataStore().get(path).thenAccept(result -> {
-            NamespaceBundles namespaceBundles;
 
             if (result.isPresent()) {
                 try {
@@ -276,9 +275,9 @@ public class NamespaceBundleFactory {
                     splitPartition = i;
                     Long maxVal = sourceBundle.partitions[i + 1];
                     Long minVal = sourceBundle.partitions[i];
-                    Long segSize = splitBoundary == null ? (maxVal - minVal) / numBundles : splitBoundary - minVal;
+                    long segSize = splitBoundary == null ? (maxVal - minVal) / numBundles : splitBoundary - minVal;
                     partitions[pos++] = minVal;
-                    Long curPartition = minVal + segSize;
+                    long curPartition = minVal + segSize;
                     for (int j = 0; j < numBundles - 1; j++) {
                         partitions[pos++] = curPartition;
                         curPartition += segSize;

[pulsar] 01/04: [C++] Make some clean up methods thread safe (#11762)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0c29a884812247663168511d5c71b15257734f06
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Aug 25 14:19:49 2021 +0800

    [C++] Make some clean up methods thread safe (#11762)
    
    * Make some close methods thread safe
    
    * Restore shutdown() in ClientImpl's destructor and check whether connection pool is closed
    
    (cherry picked from commit 098ba16c15e81dad84e5b03a6565f8bb9941ad7a)
---
 pulsar-client-cpp/lib/ClientImpl.cc                | 13 ++++++++++++-
 pulsar-client-cpp/lib/ConnectionPool.cc            | 14 +++++++++++++-
 pulsar-client-cpp/lib/ConnectionPool.h             |  9 ++++++++-
 pulsar-client-cpp/lib/ExecutorService.cc           |  7 +++++++
 pulsar-client-cpp/lib/ExecutorService.h            |  3 +++
 pulsar-client-cpp/lib/Producer.cc                  |  1 -
 pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc     | 10 ++++++++++
 pulsar-client-cpp/tests/BinaryLookupServiceTest.cc |  2 --
 pulsar-client-cpp/tests/CustomLoggerTest.cc        |  4 ++--
 pulsar-client-cpp/tests/MessageTest.cc             |  2 --
 pulsar-client-cpp/tests/ReaderConfigurationTest.cc |  3 ---
 11 files changed, 55 insertions(+), 13 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 5abe6ec..613a979 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -560,10 +560,21 @@ void ClientImpl::shutdown() {
         }
     }
 
-    pool_.close();
+    if (producers.size() + consumers.size() > 0) {
+        LOG_DEBUG(producers.size() << " producers and " << consumers.size()
+                                   << " consumers have been shutdown.");
+    }
+    if (!pool_.close()) {
+        // pool_ has already been closed. It means shutdown() has been called before.
+        return;
+    }
+    LOG_DEBUG("ConnectionPool is closed");
     ioExecutorProvider_->close();
+    LOG_DEBUG("ioExecutorProvider_ is closed");
     listenerExecutorProvider_->close();
+    LOG_DEBUG("listenerExecutorProvider_ is closed");
     partitionListenerExecutorProvider_->close();
+    LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
 }
 
 uint64_t ClientImpl::newProducerId() {
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc
index bb4f5c2..e03697f 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -41,7 +41,12 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
       poolConnections_(poolConnections),
       mutex_() {}
 
-void ConnectionPool::close() {
+bool ConnectionPool::close() {
+    bool expectedState = false;
+    if (!closed_.compare_exchange_strong(expectedState, true)) {
+        return false;
+    }
+
     std::unique_lock<std::mutex> lock(mutex_);
     if (poolConnections_) {
         for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
@@ -52,10 +57,17 @@ void ConnectionPool::close() {
         }
         pool_.clear();
     }
+    return true;
 }
 
 Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
     const std::string& logicalAddress, const std::string& physicalAddress) {
+    if (closed_) {
+        Promise<Result, ClientConnectionWeakPtr> promise;
+        promise.setFailed(ResultAlreadyClosed);
+        return promise.getFuture();
+    }
+
     std::unique_lock<std::mutex> lock(mutex_);
 
     if (poolConnections_) {
diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h
index 8b35044..5032e80 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.h
+++ b/pulsar-client-cpp/lib/ConnectionPool.h
@@ -24,6 +24,7 @@
 
 #include "ClientConnection.h"
 
+#include <atomic>
 #include <string>
 #include <map>
 #include <mutex>
@@ -36,7 +37,12 @@ class PULSAR_PUBLIC ConnectionPool {
     ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
                    const AuthenticationPtr& authentication, bool poolConnections = true);
 
-    void close();
+    /**
+     * Close the connection pool.
+     *
+     * @return false if it has already been closed.
+     */
+    bool close();
 
     /**
      * Get a connection from the pool.
@@ -65,6 +71,7 @@ class PULSAR_PUBLIC ConnectionPool {
     PoolMap pool_;
     bool poolConnections_;
     std::mutex mutex_;
+    std::atomic_bool closed_{false};
 
     friend class ConnectionPoolTest;
 };
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index f7cb010..4db3112 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -62,6 +62,11 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
 }
 
 void ExecutorService::close() {
+    bool expectedState = false;
+    if (!closed_.compare_exchange_strong(expectedState, true)) {
+        return;
+    }
+
     io_service_->stop();
     work_.reset();
     // Detach the worker thread instead of join to avoid potential deadlock
@@ -95,6 +100,8 @@ ExecutorServicePtr ExecutorServiceProvider::get() {
 }
 
 void ExecutorServiceProvider::close() {
+    Lock lock(mutex_);
+
     for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) {
         if (*it != NULL) {
             (*it)->close();
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index d0ffc23..6746936 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -19,6 +19,7 @@
 #ifndef _PULSAR_EXECUTOR_SERVICE_HEADER_
 #define _PULSAR_EXECUTOR_SERVICE_HEADER_
 
+#include <atomic>
 #include <memory>
 #include <boost/asio.hpp>
 #include <boost/asio/ssl.hpp>
@@ -73,6 +74,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable {
      * io_service
      */
     std::thread worker_;
+
+    std::atomic_bool closed_{false};
 };
 
 typedef std::shared_ptr<ExecutorService> ExecutorServicePtr;
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index acd021b..ad60828 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -24,7 +24,6 @@
 #include "ProducerImpl.h"
 
 namespace pulsar {
-DECLARE_LOG_OBJECT()
 
 static const std::string EMPTY_STRING;
 
diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
index 5c37f48..919536f 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
@@ -38,7 +38,17 @@
 #include <boost/property_tree/ptree.hpp>
 namespace ptree = boost::property_tree;
 
+#if defined(__clang__)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wunknown-warning-option"
+#endif
+
 #include <boost/xpressive/xpressive.hpp>
+
+#if defined(__clang__)
+#pragma clang diagnostic pop
+#endif
+
 #include <boost/archive/iterators/base64_from_binary.hpp>
 #include <boost/archive/iterators/transform_width.hpp>
 
diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
index 11cc053..b880df3 100644
--- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
+++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
@@ -27,8 +27,6 @@
 #include <pulsar/Authentication.h>
 #include <boost/exception/all.hpp>
 
-DECLARE_LOG_OBJECT()
-
 using namespace pulsar;
 
 TEST(BinaryLookupServiceTest, basicLookup) {
diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc
index ec83e42..0b4e76a 100644
--- a/pulsar-client-cpp/tests/CustomLoggerTest.cc
+++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc
@@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
         // reset to previous log factory
         Client client("pulsar://localhost:6650", clientConfig);
         client.close();
-        ASSERT_EQ(logLines.size(), 3);
+        ASSERT_EQ(logLines.size(), 7);
         LogUtils::resetLoggerFactory();
     });
     testThread.join();
@@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
     Client client("pulsar://localhost:6650", clientConfig);
     client.close();
     // custom logger didn't get any new lines
-    ASSERT_EQ(logLines.size(), 3);
+    ASSERT_EQ(logLines.size(), 7);
 }
 
 TEST(CustomLoggerTest, testConsoleLoggerFactory) {
diff --git a/pulsar-client-cpp/tests/MessageTest.cc b/pulsar-client-cpp/tests/MessageTest.cc
index 246203c..3c728c9 100644
--- a/pulsar-client-cpp/tests/MessageTest.cc
+++ b/pulsar-client-cpp/tests/MessageTest.cc
@@ -22,8 +22,6 @@
 #include <string>
 #include <lib/LogUtils.h>
 
-DECLARE_LOG_OBJECT()
-
 using namespace pulsar;
 TEST(MessageTest, testMessageContents) {
     MessageBuilder msgBuilder1;
diff --git a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
index ccbfa2d..8dc60f4 100644
--- a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
@@ -22,12 +22,9 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
-#include <lib/LogUtils.h>
 #include <lib/ReaderImpl.h>
 #include "NoOpsCryptoKeyReader.h"
 
-DECLARE_LOG_OBJECT()
-
 using namespace pulsar;
 
 static const std::string lookupUrl = "pulsar://localhost:6650";