You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/02 11:37:56 UTC

[pulsar] branch master updated: [feature][client] PIP-184: Topic specific consumer priorityLevel (#16715)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cfd2178ca80 [feature][client] PIP-184: Topic specific consumer priorityLevel (#16715)
cfd2178ca80 is described below

commit cfd2178ca80e4b6db89385cd39646f20f1c2c066
Author: Dave Maughan <da...@streamnative.io>
AuthorDate: Tue Aug 2 12:37:49 2022 +0100

    [feature][client] PIP-184: Topic specific consumer priorityLevel (#16715)
---
 .../apache/pulsar/client/api/ConsumerBuilder.java  | 34 +++++++++
 .../pulsar/client/api/TopicConsumerBuilder.java    | 47 ++++++++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 30 ++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  5 +-
 .../client/impl/TopicConsumerBuilderImpl.java      | 43 +++++++++++
 .../impl/conf/ConsumerConfigurationData.java       | 16 ++++
 .../impl/conf/TopicConsumerConfigurationData.java  | 85 ++++++++++++++++++++++
 .../client/impl/ConsumerBuilderImplTest.java       | 18 +++++
 .../pulsar/client/impl/ConsumerImplTest.java       | 26 ++++++-
 .../client/impl/TopicConsumerBuilderImplTest.java  | 55 ++++++++++++++
 .../impl/conf/ConsumerConfigurationDataTest.java   | 48 ++++++++++++
 .../conf/TopicConsumerConfigurationDataTest.java   | 74 +++++++++++++++++++
 site2/docs/client-libraries-java.md                |  2 +-
 13 files changed, 477 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index e303e55538e..804aa7097f3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -813,4 +813,38 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * @param enabled whether to enable AutoScaledReceiverQueueSize.
      */
     ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);
+
+    /**
+     * Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
+     *
+     * @param topicName a topic name
+     * @return a {@link TopicConsumerBuilder} instance
+     */
+    TopicConsumerBuilder<T> topicConfiguration(String topicName);
+
+    /**
+     * Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
+     *
+     * @param topicName a topic name
+     * @param builderConsumer a consumer to allow the configuration of the {@link TopicConsumerBuilder} instance
+     */
+    ConsumerBuilder<T> topicConfiguration(String topicName,
+                                          java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);
+
+    /**
+     * Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
+     *
+     * @param topicsPattern a regular expression to match a topic name
+     * @return a {@link TopicConsumerBuilder} instance
+     */
+    TopicConsumerBuilder<T> topicConfiguration(Pattern topicsPattern);
+
+    /**
+     * Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
+     *
+     * @param topicsPattern a regular expression to match a topic name
+     * @param builderConsumer a consumer to allow the configuration of the {@link TopicConsumerBuilder} instance
+     */
+    ConsumerBuilder<T> topicConfiguration(Pattern topicsPattern,
+                                          java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java
new file mode 100644
index 00000000000..5096f525776
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * {@link TopicConsumerBuilder} is used to configure topic specific options to override those set at the
+ * {@link ConsumerBuilder} level.
+ *
+ * @see ConsumerBuilder#topicConfiguration(String)
+ *
+ * @param <T> the type of the value in the {@link ConsumerBuilder}
+ */
+public interface TopicConsumerBuilder<T> {
+    /**
+     * Configure the priority level of this topic.
+     *
+     * @see ConsumerBuilder#priorityLevel(int)
+     *
+     * @param priorityLevel the priority of this topic
+     * @return the {@link TopicConsumerBuilder} instance
+     */
+    TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
+
+    /**
+     * Complete the configuration of the topic specific options and return control back to the
+     * {@link ConsumerBuilder} instance.
+     *
+     * @return the {@link ConsumerBuilder} instance
+     */
+    ConsumerBuilder<T> build();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 616ed86abb8..d30e72aa53c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -52,8 +52,10 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicConsumerBuilder;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -537,4 +539,32 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
         conf.setAutoScaledReceiverQueueSizeEnabled(enabled);
         return this;
     }
+
+    @Override
+    public TopicConsumerBuilder<T> topicConfiguration(String topicName) {
+        TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicName(topicName, conf);
+        conf.getTopicConfigurations().add(topicConf);
+        return new TopicConsumerBuilderImpl<>(this, topicConf);
+    }
+
+    @Override
+    public ConsumerBuilder<T> topicConfiguration(String topicName,
+                                                 java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer) {
+        builderConsumer.accept(topicConfiguration(topicName));
+        return this;
+    }
+
+    @Override
+    public TopicConsumerBuilder<T> topicConfiguration(Pattern topicsPattern) {
+        TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicsPattern(topicsPattern, conf);
+        conf.getTopicConfigurations().add(topicConf);
+        return new TopicConsumerBuilderImpl<>(this, topicConf);
+    }
+
+    @Override
+    public ConsumerBuilder<T> topicConfiguration(Pattern topicsPattern,
+                                                 java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer) {
+        builderConsumer.accept(topicConfiguration(topicsPattern));
+        return this;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5bc998c4526..c474da345c2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -61,6 +61,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import lombok.AccessLevel;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -146,6 +148,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private final NegativeAcksTracker negativeAcksTracker;
 
     protected final ConsumerStatsRecorder stats;
+    @Getter(AccessLevel.PACKAGE)
     private final int priorityLevel;
     private final SubscriptionMode subscriptionMode;
     private volatile BatchMessageIdImpl startMessageId;
@@ -266,7 +269,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.partitionIndex = partitionIndex;
         this.hasParentConsumer = hasParentConsumer;
         this.parentConsumerHasListener = parentConsumerHasListener;
-        this.priorityLevel = conf.getPriorityLevel();
+        this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel();
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
         this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java
new file mode 100644
index 00000000000..33f91366584
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import lombok.RequiredArgsConstructor;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.TopicConsumerBuilder;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
+
+@RequiredArgsConstructor
+class TopicConsumerBuilderImpl<T> implements TopicConsumerBuilder<T> {
+    private final ConsumerBuilder<T> consumerBuilder;
+    private final TopicConsumerConfigurationData topicConf;
+
+    @Override
+    public TopicConsumerBuilder<T> priorityLevel(int priorityLevel) {
+        checkArgument(priorityLevel >= 0, "priorityLevel needs to be >= 0");
+        topicConf.setPriorityLevel(priorityLevel);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder<T> build() {
+        return consumerBuilder;
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 6c22d143a6f..dcde042f4e8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Sets;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
@@ -164,6 +166,20 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
 
     private boolean autoScaledReceiverQueueSizeEnabled = false;
 
+    private List<TopicConsumerConfigurationData> topicConfigurations = new ArrayList<>();
+
+    public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) {
+        return topicConfigurations.stream()
+                .filter(topicConf -> topicConf.getTopicNameMatcher().matches(topicName))
+                .findFirst()
+                .orElseGet(() -> TopicConsumerConfigurationData.ofTopicName(topicName, this));
+    }
+
+    public void setTopicConfigurations(List<TopicConsumerConfigurationData> topicConfigurations) {
+        checkArgument(topicConfigurations != null, "topicConfigurations should not be null.");
+        this.topicConfigurations = topicConfigurations;
+    }
+
     public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
         checkArgument(interval > 0, "interval needs to be > 0");
         this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java
new file mode 100644
index 00000000000..e6d7a9aa0d7
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import java.io.Serializable;
+import java.util.regex.Pattern;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TopicConsumerConfigurationData implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private TopicNameMatcher topicNameMatcher;
+    private int priorityLevel;
+
+    public static TopicConsumerConfigurationData ofTopicsPattern(@NonNull Pattern topicsPattern, int priorityLevel) {
+        return of(new TopicNameMatcher.TopicsPattern(topicsPattern), priorityLevel);
+    }
+
+    public static TopicConsumerConfigurationData ofTopicsPattern(@NonNull Pattern topicsPattern,
+                                                                 ConsumerConfigurationData<?> conf) {
+        return ofTopicsPattern(topicsPattern, conf.getPriorityLevel());
+    }
+
+    public static TopicConsumerConfigurationData ofTopicName(@NonNull String topicName, int priorityLevel) {
+        return of(new TopicNameMatcher.TopicName(topicName), priorityLevel);
+    }
+
+    public static TopicConsumerConfigurationData ofTopicName(@NonNull String topicName,
+                                                             ConsumerConfigurationData<?> conf) {
+        return ofTopicName(topicName, conf.getPriorityLevel());
+    }
+
+    static TopicConsumerConfigurationData of(@NonNull TopicNameMatcher topicNameMatcher, int priorityLevel) {
+        return new TopicConsumerConfigurationData(topicNameMatcher, priorityLevel);
+    }
+
+    public interface TopicNameMatcher extends Serializable {
+        boolean matches(String topicName);
+
+        @RequiredArgsConstructor
+        class TopicsPattern implements TopicNameMatcher {
+            @NonNull
+            private final Pattern topicsPattern;
+
+            @Override
+            public boolean matches(String topicName) {
+                return topicsPattern.matcher(topicName).matches();
+            }
+        }
+
+        @RequiredArgsConstructor
+        class TopicName implements TopicNameMatcher {
+            @NonNull
+            private final String topicName;
+
+            @Override
+            public boolean matches(String topicName) {
+                return this.topicName.equals(topicName);
+            }
+        }
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index ff60caca2c1..32afe69c3d0 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertNotNull;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -31,12 +33,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -338,4 +342,18 @@ public class ConsumerBuilderImplTest {
         consumerBuilderImpl.startPaused(true);
         verify(consumerBuilderImpl.getConf()).setStartPaused(true);
     }
+
+    @Test
+    public void testTopicConsumerBuilder() {
+        List<TopicConsumerConfigurationData> topicConsumerConfigurationDataList = new ArrayList<>();
+        when(consumerBuilderImpl.getConf().getTopicConfigurations()).thenReturn(topicConsumerConfigurationDataList);
+
+        ConsumerBuilder<?> consumerBuilder = consumerBuilderImpl.topicConfiguration(Pattern.compile("foo")).priorityLevel(1).build();
+
+        assertThat(consumerBuilder).isSameAs(consumerBuilderImpl);
+        assertThat(topicConsumerConfigurationDataList).hasSize(1);
+        TopicConsumerConfigurationData topicConsumerConfigurationData = topicConsumerConfigurationDataList.get(0);
+        assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue();
+        assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1);
+    }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index fcea9d49070..756c18441b3 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -39,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -47,23 +49,28 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class ConsumerImplTest {
+    private final String topic = "non-persistent://tenant/ns1/my-topic";
 
     private ExecutorProvider executorProvider;
     private ExecutorService internalExecutor;
     private ConsumerImpl<byte[]> consumer;
-    private ConsumerConfigurationData consumerConf;
+    private ConsumerConfigurationData<byte[]> consumerConf;
 
     @BeforeMethod(alwaysRun = true)
     public void setUp() {
+        consumerConf = new ConsumerConfigurationData<>();
+        createConsumer(consumerConf);
+    }
+
+    private void createConsumer(ConsumerConfigurationData consumerConf) {
         executorProvider = new ExecutorProvider(1, "ConsumerImplTest");
         internalExecutor = Executors.newSingleThreadScheduledExecutor();
-        consumerConf = new ConsumerConfigurationData<>();
+
         PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor);
         ClientConfigurationData clientConf = client.getConfiguration();
         clientConf.setOperationTimeoutMs(100);
         clientConf.setStatsIntervalSeconds(0);
-        CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new CompletableFuture<>();
-        String topic = "non-persistent://tenant/ns1/my-topic";
+        CompletableFuture<Consumer<byte[]>> subscribeFuture = new CompletableFuture<>();
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
@@ -239,4 +246,15 @@ public class ConsumerImplTest {
         Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), size + 100);
         Assert.assertEquals(consumer.getAvailablePermits(), permits + 100);
     }
+
+    @Test
+    public void testTopicPriorityLevel() {
+        ConsumerConfigurationData<Object> consumerConf = new ConsumerConfigurationData<>();
+        consumerConf.getTopicConfigurations().add(
+                TopicConsumerConfigurationData.ofTopicName(topic, 1));
+
+        createConsumer(consumerConf);
+
+        assertThat(consumer.getPriorityLevel()).isEqualTo(1);
+    }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java
new file mode 100644
index 00000000000..cfc2380cebe
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TopicConsumerBuilderImplTest {
+    private TopicConsumerConfigurationData topicConsumerConfigurationData;
+    private TopicConsumerBuilderImpl<String> topicConsumerBuilderImpl;
+
+    @SuppressWarnings("unchecked")
+    @BeforeMethod(alwaysRun = true)
+    public void setup() {
+        ConsumerBuilder<String> consumerBuilder = mock(ConsumerBuilder.class);
+        topicConsumerConfigurationData = mock(TopicConsumerConfigurationData.class);
+        topicConsumerBuilderImpl = new TopicConsumerBuilderImpl<>(consumerBuilder, topicConsumerConfigurationData);
+    }
+
+    @Test
+    public void testInvalidPriorityLevel() {
+        assertThatIllegalArgumentException()
+                .isThrownBy(() -> topicConsumerBuilderImpl.priorityLevel(-1));
+        verify(topicConsumerConfigurationData, never()).setPriorityLevel(anyInt());
+    }
+
+    @Test
+    public void testValidPriorityLevel() {
+        topicConsumerBuilderImpl.priorityLevel(0);
+        verify(topicConsumerConfigurationData).setPriorityLevel(0);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java
new file mode 100644
index 00000000000..0ec031d2505
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.regex.Pattern;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ConsumerConfigurationDataTest {
+    @DataProvider(name = "topicConf")
+    public Object[][] topicConf() {
+        return new Object[][] {
+                new Object[] {"foo", 2},
+                new Object[] {"bar", 1}
+        };
+    }
+
+    @Test(dataProvider = "topicConf")
+    public void testTopicConsumerConfigurationData(String topicName, int expectedPriority) {
+        ConsumerConfigurationData<String> consumerConfigurationData = new ConsumerConfigurationData<>();
+        consumerConfigurationData.setPriorityLevel(1);
+
+        consumerConfigurationData.getTopicConfigurations()
+                .add(TopicConsumerConfigurationData.ofTopicsPattern(Pattern.compile("^foo$"), 2));
+
+        TopicConsumerConfigurationData topicConsumerConfigurationData =
+                consumerConfigurationData.getMatchingTopicConfiguration(topicName);
+
+        assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(expectedPriority);
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java
new file mode 100644
index 00000000000..a2bea68d1ac
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import java.util.regex.Pattern;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TopicConsumerConfigurationDataTest {
+    @Test
+    public void testOfFactoryMethod() {
+        TopicConsumerConfigurationData topicConsumerConfigurationData = TopicConsumerConfigurationData
+                .ofTopicName("foo", 1);
+
+        assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue();
+        assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1);
+    }
+
+    @Test
+    public void testOfDefaultFactoryMethod() {
+        ConsumerConfigurationData<Object> consumerConfigurationData = new ConsumerConfigurationData<>();
+        consumerConfigurationData.setPriorityLevel(1);
+        TopicConsumerConfigurationData topicConsumerConfigurationData = TopicConsumerConfigurationData
+                .ofTopicName("foo", consumerConfigurationData);
+
+        assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue();
+        assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1);
+    }
+
+    @DataProvider(name = "topicNameMatch")
+    public Object[][] topicNameMatch() {
+        return new Object[][] {
+                new Object[] {"foo", true},
+                new Object[] {"bar", false}
+        };
+    }
+
+    @Test(dataProvider = "topicNameMatch")
+    public void testTopicNameMatch(String topicName, boolean expectedMatch) {
+        TopicConsumerConfigurationData topicConsumerConfigurationData = TopicConsumerConfigurationData
+                .ofTopicsPattern(Pattern.compile("^foo$"), 1);
+        assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches(topicName)).isEqualTo(expectedMatch);
+    }
+
+    @Test
+    public void testNullTopicsPattern() {
+        assertThatNullPointerException()
+                .isThrownBy(() -> TopicConsumerConfigurationData.ofTopicsPattern(null, 1));
+    }
+
+    @Test
+    public void testTopicNameMatchNullTopicName() {
+        assertThat(TopicConsumerConfigurationData
+                .ofTopicName("foo", 1).getTopicNameMatcher().matches(null)).isFalse();
+    }
+}
diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md
index 6e72d7fdae2..670736ad3ad 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -729,7 +729,7 @@ When you create a consumer, you can use the `loadConf` configuration. The follow
 `consumerName`|String|Consumer name|null
 `ackTimeoutMillis`|long|Timeout of unacked messages|0
 `tickDurationMillis`|long|Granularity of the ack-timeout redelivery.<br /><br />Using an higher `tickDurationMillis` reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).|1000
-`priorityLevel`|int|Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type. <br /><br />The broker follows descending priorities. For example, 0=max-priority, 1, 2,...<br /><br />In Shared subscription type, the broker **first dispatches messages to the max priority level consumers if they have permits**. Otherwise, the broker considers next priority level consumers.<br /><br /> **Example 1**<br />If a subscription has c [...]
+`priorityLevel`|int|Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type. It can be set at the consumer level so all topics being consumed will have the same priority level or each topic being consumed can be given a different priority level.<br /><br />The broker follows descending priorities. For example, 0=max-priority, 1, 2,...<br /><br />In Shared subscription type, the broker **first dispatches messages to the ma [...]
 `cryptoFailureAction`|ConsumerCryptoFailureAction|Consumer should take action when it receives a message that can not be decrypted.<br /><li>**FAIL**: this is the default option to fail messages until crypto succeeds.</li><li> **DISCARD**:silently acknowledge and not deliver message to an application.</li><li>**CONSUME**: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.</li><br />The decompression of message fails. <br /><br />If  [...]
 `properties`|SortedMap<String, String>|A name or value property of this consumer.<br /><br />`properties` is application defined metadata attached to a consumer. <br /><br />When getting a topic stats, associate this metadata with the consumer stats for easier identification.|new TreeMap()
 `readCompacted`|boolean|If enabling `readCompacted`, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.<br /><br /> A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.<br /><br />Only enabling `readCompacted` on subscriptions to persistent topics, which have a single active consumer (like failure  [...]