You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/03/05 19:50:22 UTC

[pulsar] branch master updated: Broker considers fail-over consumer priority-level (#2954)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c39e7d1  Broker considers fail-over consumer priority-level (#2954)
c39e7d1 is described below

commit c39e7d196d80d1b1cecfbc8a15808698d1741089
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Mar 5 11:50:17 2019 -0800

    Broker considers fail-over consumer priority-level (#2954)
    
    add java doc
    
    fix partitioned-topic distribution
---
 .../AbstractDispatcherSingleActiveConsumer.java    | 29 +++++--
 .../client/api/SimpleProducerConsumerTest.java     | 93 ++++++++++++++++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  | 20 +++++
 3 files changed, 137 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 4f6452f..5bc6af7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -22,13 +22,12 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
-import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
@@ -85,9 +84,29 @@ public abstract class AbstractDispatcherSingleActiveConsumer {
     protected boolean pickAndScheduleActiveConsumer() {
         checkArgument(!consumers.isEmpty());
 
-        consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName()));
-
-        int index = partitionIndex % consumers.size();
+        AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false);
+        consumers.sort((c1, c2) -> {
+            int priority = c1.getPriorityLevel() - c2.getPriorityLevel();
+            if (priority != 0) {
+                hasPriorityConsumer.set(true);
+                return priority;
+            }
+            return c1.consumerName().compareTo(c2.consumerName());
+        });
+
+        int consumersSize = consumers.size();
+        // find number of consumers which are having the highest priorities. so partitioned-topic assignment happens
+        // evenly across highest priority consumers
+        if (hasPriorityConsumer.get()) {
+            int highestPriorityLevel = consumers.get(0).getPriorityLevel();
+            for (int i = 0; i < consumers.size(); i++) {
+                if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) {
+                    consumersSize = i;
+                    break;
+                }
+            }
+        }
+        int index = partitionIndex % consumersSize;
         Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));
 
         Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 9dd79b2..4a142ce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
@@ -63,6 +64,8 @@ import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageCrypto;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -2897,6 +2900,96 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         consumer.close();
     }
 
+    /**
+     * This test verifies that broker activates fail-over consumer by considering priority-level as well.
+     * 
+     * <pre>
+     * 1. Start two failover consumer with same priority level, broker selects consumer based on name-sorting (consumer1).
+     * 2. Switch non-active consumer to active (consumer2): by giving it higher priority
+     * Partitioned-topic with 9 partitions:
+     * 1. C1 (priority=1)
+     * 2. C2,C3,C4 (priority=0)
+     * So, broker should evenly distribute C2,C3,C4 active consumers among 9 partitions. 
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testFailOverConsumerPriority() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String topicName = "persistent://my-property/my-ns/priority-topic";
+        final String subscriptionName = "my-sub";
+        final int noOfPartitions = 9;
+
+        // create partitioned topic
+        admin.topics().createPartitionedTopic(topicName, noOfPartitions);
+
+        // Only subscribe consumer
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .consumerName("aaa").subscriptionType(SubscriptionType.Failover)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).priorityLevel(1).subscribe();
+
+        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName(subscriptionName).consumerName("bbb1").subscriptionType(SubscriptionType.Failover)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).priorityLevel(1);
+
+        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+
+        AtomicInteger consumer1Count = new AtomicInteger(0);
+        admin.topics().getPartitionedStats(topicName, true).partitions.forEach((p, stats) -> {
+            String activeConsumerName = stats.subscriptions.entrySet().iterator().next().getValue().activeConsumerName;
+            if (activeConsumerName.equals("aaa")) {
+                consumer1Count.incrementAndGet();
+            }
+        });
+
+        // validate even distribution among two consumers
+        assertNotEquals(consumer1Count, noOfPartitions);
+
+        consumer2.close();
+        consumer2 = consumerBuilder.priorityLevel(0).subscribe();
+        Consumer<byte[]> consumer3 = consumerBuilder.consumerName("bbb2").priorityLevel(0).subscribe();
+        Consumer<byte[]> consumer4 = consumerBuilder.consumerName("bbb3").priorityLevel(0).subscribe();
+        Consumer<byte[]> consumer5 = consumerBuilder.consumerName("bbb4").priorityLevel(1).subscribe();
+
+        Integer evenDistributionCount = noOfPartitions / 3;
+        retryStrategically((test) -> {
+            try {
+                Map<String, Integer> subsCount = Maps.newHashMap();
+                admin.topics().getPartitionedStats(topicName, true).partitions.forEach((p, stats) -> {
+                    String activeConsumerName = stats.subscriptions.entrySet().iterator().next()
+                            .getValue().activeConsumerName;
+                    subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
+                });
+                return subsCount.size() == 3 && subsCount.get("bbb1") == evenDistributionCount
+                        && subsCount.get("bbb2") == evenDistributionCount
+                        && subsCount.get("bbb3") == evenDistributionCount;
+
+            } catch (PulsarAdminException e) {
+                // Ok
+            }
+            return false;
+        }, 5, 100);
+
+        Map<String, Integer> subsCount = Maps.newHashMap();
+        admin.topics().getPartitionedStats(topicName, true).partitions.forEach((p, stats) -> {
+            String activeConsumerName = stats.subscriptions.entrySet().iterator().next().getValue().activeConsumerName;
+            subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
+        });
+        assertEquals(subsCount.size(), 3);
+        assertEquals(subsCount.get("bbb1"), evenDistributionCount);
+        assertEquals(subsCount.get("bbb2"), evenDistributionCount);
+        assertEquals(subsCount.get("bbb3"), evenDistributionCount);
+
+        consumer1.close();
+        consumer2.close();
+        consumer3.close();
+        consumer4.close();
+        consumer5.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
     // Issue 3226: https://github.com/apache/pulsar/issues/3226
     // Pull 3312: https://github.com/apache/pulsar/pull/3312
     // Bugfix preventing duplicated consumers on same client cnx with shared subscription mode
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 504dd50..09ad5eb 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -348,6 +348,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes);
 
     /**
+     * <b>Shared subscription</b>
      * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
      * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br>
      * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
@@ -364,6 +365,25 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * C5       1             1
      * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
      * </pre>
+     * 
+     * <b>Failover subscription</b>
+     * Broker selects active consumer for a failover-subscription based on consumer's priority-level and lexicographical sorting of a consumer name.
+     * eg:
+     * <pre>
+     * 1. Active consumer = C1 : Same priority-level and lexicographical sorting
+     * Consumer PriorityLevel Name
+     * C1       0             aaa
+     * C2       0             bbb
+     * 
+     * 2. Active consumer = C2 : Consumer with highest priority
+     * Consumer PriorityLevel Name
+     * C1       1             aaa
+     * C2       0             bbb
+     * 
+     * Partitioned-topics:
+     * Broker evenly assigns partitioned topics to highest priority consumers.
+     * 
+     * </pre>
      *
      * @param priorityLevel the priority of this consumer
      * @return the consumer builder instance