You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/19 03:33:05 UTC

[GitHub] [pulsar] hangc0276 commented on a change in pull request #12792: Fixes #6555 Send Key hash range change events to consumers in Key_Sha…

hangc0276 commented on a change in pull request #12792:
URL: https://github.com/apache/pulsar/pull/12792#discussion_r752821409



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
##########
@@ -74,6 +80,7 @@ public void addConsumer(Consumer consumer) throws ConsumerAssignException {
                     }
                 });
             }
+            notifyActiveConsumerChanged();

Review comment:
       Why call notifyActiveConsumerChanged in write lock area?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -465,7 +466,20 @@ protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
         }
         ConsumerImpl<?> consumer = consumers.get(change.getConsumerId());
         if (consumer != null) {
-            consumer.activeConsumerChanged(change.isIsActive());
+            switch (consumer.getSubType()){
+                case Failover:{
+                    consumer.activeConsumerChanged(change.isIsActive(), null);
+                    break;
+                }
+                case Key_Shared:{
+                    if (StringUtils.isNoneBlank(change.getKeySharedProps())) {
+                        consumer.activeConsumerChanged(true,
+                                StickyKeyConsumerPredicate.decode(change.getKeySharedProps()));
+                    }
+                    break;
+                }
+                default : {}

Review comment:
       do we nedd `default: {}` ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
##########
@@ -148,4 +156,29 @@ public Consumer select(int hash) {
     Map<Integer, List<Consumer>> getRangeConsumer() {
         return Collections.unmodifiableMap(hashRing);
     }
+
+    @Override
+    public StickyKeyConsumerPredicate generateSpecialPredicate(final Consumer consumer){
+        NavigableMap<Integer, List<String>> cpHashRing = new ConcurrentSkipListMap<>();
+        for (Map.Entry<Integer, List<Consumer>> entry: hashRing.entrySet()) {
+            if (CollectionUtils.isEmpty(entry.getValue())) {
+                continue;
+            }
+            cpHashRing.put(entry.getKey(), entry.getValue()
+                    .stream()
+                    .map(v -> v == consumer ? StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK
+                            : StickyKeyConsumerPredicate.OTHER_CONSUMER_MARK)
+                    .collect(Collectors.toList())
+            );
+        }
+        return new StickyKeyConsumerPredicate.Predicate4ConsistentHashingStickyKeyConsumerSelector(cpHashRing);
+    }
+
+    private void notifyActiveConsumerChanged() {
+        // TODO add configuration for this events in future
+        Set<Consumer> consumerSet = new HashSet<>(hashRing.values().stream()

Review comment:
       can be replaced to `Set<Consumer> consumerSet = hashRing.values().stream()
               .flatMap(Collection::stream).collect(Collectors.toSet());`

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
##########
@@ -99,6 +100,11 @@ public void becameInactive(Consumer<?> consumer, int partitionId) {
             } catch (InterruptedException e) {
             }
         }
+
+        @Override
+        public void keySharedRuleChanged(Consumer<?> consumer, Predicate<String> keyPredicate) {

Review comment:
       you have add the `default` tag for this method in the interface, do we need to override it in sub class?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
##########
@@ -96,9 +104,20 @@ public synchronized void removeConsumer(Consumer consumer) {
                 rangeMap.put(removeRange, lowerEntry.getValue());
                 rangeMap.remove(lowerEntry.getKey());
                 consumerRange.put(lowerEntry.getValue(), removeRange);
+                // notify change for effected consumer
+                lowerEntry.getValue().notifyActiveConsumerChange(
+                        generateSpecialPredicate(lowerEntry.getValue()).encode());
             } else {
                 rangeMap.remove(removeRange);
+                // notify change for effected consumer.
+                Map.Entry<Integer, Consumer> lowerEntry = rangeMap.higherEntry(removeRange);
+                if (lowerEntry != null) {
+                    lowerEntry.getValue().notifyActiveConsumerChange(
+                            generateSpecialPredicate(lowerEntry.getValue()).encode());
+                }
             }
+//            // notify removed consumer change

Review comment:
       remove this code

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
##########
@@ -148,4 +156,29 @@ public Consumer select(int hash) {
     Map<Integer, List<Consumer>> getRangeConsumer() {
         return Collections.unmodifiableMap(hashRing);
     }
+
+    @Override
+    public StickyKeyConsumerPredicate generateSpecialPredicate(final Consumer consumer){
+        NavigableMap<Integer, List<String>> cpHashRing = new ConcurrentSkipListMap<>();

Review comment:
       Do we need ConcurrentSkipListMap? 

##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -643,6 +643,15 @@ public static ByteBuf newActiveConsumerChange(long consumerId, boolean isActive)
         return serializeWithSize(cmd);
     }
 
+    public static ByteBuf newActiveConsumerChange(long consumerId, String keySharedProps) {

Review comment:
       Can we merge this method into the above method?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
##########
@@ -96,9 +104,20 @@ public synchronized void removeConsumer(Consumer consumer) {
                 rangeMap.put(removeRange, lowerEntry.getValue());
                 rangeMap.remove(lowerEntry.getKey());
                 consumerRange.put(lowerEntry.getValue(), removeRange);
+                // notify change for effected consumer
+                lowerEntry.getValue().notifyActiveConsumerChange(
+                        generateSpecialPredicate(lowerEntry.getValue()).encode());
             } else {
                 rangeMap.remove(removeRange);
+                // notify change for effected consumer.
+                Map.Entry<Integer, Consumer> lowerEntry = rangeMap.higherEntry(removeRange);
+                if (lowerEntry != null) {
+                    lowerEntry.getValue().notifyActiveConsumerChange(

Review comment:
       please simplify the duplicated code

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -421,7 +421,8 @@ public Clock getClientClock() {
                     "Read compacted can only be used with exclusive or failover persistent subscriptions"));
         }
 
-        if (conf.getConsumerEventListener() != null && conf.getSubscriptionType() != SubscriptionType.Failover) {
+        if (conf.getConsumerEventListener() != null && conf.getSubscriptionType() != SubscriptionType.Failover
+                && conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
             return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(

Review comment:
       Please update the exception message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org