You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/03/05 19:50:22 UTC
[pulsar] branch master updated: Broker considers fail-over consumer
priority-level (#2954)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c39e7d1 Broker considers fail-over consumer priority-level (#2954)
c39e7d1 is described below
commit c39e7d196d80d1b1cecfbc8a15808698d1741089
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Mar 5 11:50:17 2019 -0800
Broker considers fail-over consumer priority-level (#2954)
add java doc
fix partitioned-topic distribution
---
.../AbstractDispatcherSingleActiveConsumer.java | 29 +++++--
.../client/api/SimpleProducerConsumerTest.java | 93 ++++++++++++++++++++++
.../apache/pulsar/client/api/ConsumerBuilder.java | 20 +++++
3 files changed, 137 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 4f6452f..5bc6af7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -22,13 +22,12 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
-import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
@@ -85,9 +84,29 @@ public abstract class AbstractDispatcherSingleActiveConsumer {
protected boolean pickAndScheduleActiveConsumer() {
checkArgument(!consumers.isEmpty());
- consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName()));
-
- int index = partitionIndex % consumers.size();
+ AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false);
+ consumers.sort((c1, c2) -> {
+ int priority = c1.getPriorityLevel() - c2.getPriorityLevel();
+ if (priority != 0) {
+ hasPriorityConsumer.set(true);
+ return priority;
+ }
+ return c1.consumerName().compareTo(c2.consumerName());
+ });
+
+ int consumersSize = consumers.size();
+ // find number of consumers which are having the highest priorities. so partitioned-topic assignment happens
+ // evenly across highest priority consumers
+ if (hasPriorityConsumer.get()) {
+ int highestPriorityLevel = consumers.get(0).getPriorityLevel();
+ for (int i = 0; i < consumers.size(); i++) {
+ if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) {
+ consumersSize = i;
+ break;
+ }
+ }
+ }
+ int index = partitionIndex % consumersSize;
Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 9dd79b2..4a142ce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
@@ -63,6 +64,8 @@ import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -2897,6 +2900,96 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
consumer.close();
}
+ /**
+ * This test verifies that broker activates fail-over consumer by considering priority-level as well.
+ *
+ * <pre>
+ * 1. Start two failover consumer with same priority level, broker selects consumer based on name-sorting (consumer1).
+ * 2. Switch non-active consumer to active (consumer2): by giving it higher priority
+ * Partitioned-topic with 9 partitions:
+ * 1. C1 (priority=1)
+ * 2. C2,C3,C4 (priority=0)
+ * So, broker should evenly distribute C2,C3,C4 active consumers among 9 partitions.
+ * </pre>
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testFailOverConsumerPriority() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String topicName = "persistent://my-property/my-ns/priority-topic";
+ final String subscriptionName = "my-sub";
+ final int noOfPartitions = 9;
+
+ // create partitioned topic
+ admin.topics().createPartitionedTopic(topicName, noOfPartitions);
+
+ // Only subscribe consumer
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .consumerName("aaa").subscriptionType(SubscriptionType.Failover)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).priorityLevel(1).subscribe();
+
+ ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName(subscriptionName).consumerName("bbb1").subscriptionType(SubscriptionType.Failover)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).priorityLevel(1);
+
+ Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+
+ AtomicInteger consumer1Count = new AtomicInteger(0);
+ admin.topics().getPartitionedStats(topicName, true).partitions.forEach((p, stats) -> {
+ String activeConsumerName = stats.subscriptions.entrySet().iterator().next().getValue().activeConsumerName;
+ if (activeConsumerName.equals("aaa")) {
+ consumer1Count.incrementAndGet();
+ }
+ });
+
+ // validate even distribution among two consumers
+ assertNotEquals(consumer1Count, noOfPartitions);
+
+ consumer2.close();
+ consumer2 = consumerBuilder.priorityLevel(0).subscribe();
+ Consumer<byte[]> consumer3 = consumerBuilder.consumerName("bbb2").priorityLevel(0).subscribe();
+ Consumer<byte[]> consumer4 = consumerBuilder.consumerName("bbb3").priorityLevel(0).subscribe();
+ Consumer<byte[]> consumer5 = consumerBuilder.consumerName("bbb4").priorityLevel(1).subscribe();
+
+ Integer evenDistributionCount = noOfPartitions / 3;
+ retryStrategically((test) -> {
+ try {
+ Map<String, Integer> subsCount = Maps.newHashMap();
+ admin.topics().getPartitionedStats(topicName, true).partitions.forEach((p, stats) -> {
+ String activeConsumerName = stats.subscriptions.entrySet().iterator().next()
+ .getValue().activeConsumerName;
+ subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
+ });
+ return subsCount.size() == 3 && subsCount.get("bbb1") == evenDistributionCount
+ && subsCount.get("bbb2") == evenDistributionCount
+ && subsCount.get("bbb3") == evenDistributionCount;
+
+ } catch (PulsarAdminException e) {
+ // Ok
+ }
+ return false;
+ }, 5, 100);
+
+ Map<String, Integer> subsCount = Maps.newHashMap();
+ admin.topics().getPartitionedStats(topicName, true).partitions.forEach((p, stats) -> {
+ String activeConsumerName = stats.subscriptions.entrySet().iterator().next().getValue().activeConsumerName;
+ subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
+ });
+ assertEquals(subsCount.size(), 3);
+ assertEquals(subsCount.get("bbb1"), evenDistributionCount);
+ assertEquals(subsCount.get("bbb2"), evenDistributionCount);
+ assertEquals(subsCount.get("bbb3"), evenDistributionCount);
+
+ consumer1.close();
+ consumer2.close();
+ consumer3.close();
+ consumer4.close();
+ consumer5.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
// Issue 3226: https://github.com/apache/pulsar/issues/3226
// Pull 3312: https://github.com/apache/pulsar/pull/3312
// Bugfix preventing duplicated consumers on same client cnx with shared subscription mode
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 504dd50..09ad5eb 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -348,6 +348,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes);
/**
+ * <b>Shared subscription</b>
* Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
* messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br>
* In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
@@ -364,6 +365,25 @@ public interface ConsumerBuilder<T> extends Cloneable {
* C5 1 1
* Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
* </pre>
+ *
+ * <b>Failover subscription</b>
+ * Broker selects active consumer for a failover-subscription based on consumer's priority-level and lexicographical sorting of a consumer name.
+ * eg:
+ * <pre>
+ * 1. Active consumer = C1 : Same priority-level and lexicographical sorting
+ * Consumer PriorityLevel Name
+ * C1 0 aaa
+ * C2 0 bbb
+ *
+ * 2. Active consumer = C2 : Consumer with highest priority
+ * Consumer PriorityLevel Name
+ * C1 1 aaa
+ * C2 0 bbb
+ *
+ * Partitioned-topics:
+ * Broker evenly assigns partitioned topics to highest priority consumers.
+ *
+ * </pre>
*
* @param priorityLevel the priority of this consumer
* @return the consumer builder instance