You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Dave Maughan <da...@streamnative.io.INVALID> on 2022/07/08 12:59:31 UTC

[DISCUSS] PIP-184: Topic specific consumer priorityLevel

Hi Pulsar community,

I've opened a PIP to discuss a new Java client option to allow setting
topic specific consumer priorityLevel.

Proposal Link: https://github.com/apache/pulsar/issues/16481

---

## Motivation

The Pulsar Java consumer supports setting a priority level for priority
message
dispatch in shared subscription consumers and priority assignment in
failover
subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int)
Javadoc](
https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
)
for a detailed functional description. The Pulsar Java consumer also
supports
consuming from multiple topics. However, it is not possible to set a
different
priority level for different topics in the same Consumer instance.

This behaviour is desirable in some use cases. For example, a consumer
processing region specific topics might wish to configure region stickiness
- A
multi-region application might be consuming from topics events-us-east-1 and
events-eu-west-1. Consumers in all regions should be configured to consume
all
topics to ensure data completeness. However, to ensure low latency, the
us-east-1 consumer would need to set a higher priority level for the
us-east-1
topic. Similarly, the eu-west-1 consumer would need to set a higher priority
level for the eu-west-1 topic.

## Goal

Update the Java client API to allow the configuration of different priority
levels for different topics.

Do so in such a way that supports the addition of other topic specific
configuration options or overrides in the future.

Issues will be created to track feature parity in the other client
implementations for this PIP.

## API Changes

In pulsar-client-api, update `ConsumerBuilder` to include two new methods:

```java
public interface ConsumerBuilder<T> extends Cloneable {
    ...

    TopicConsumerBuilder<T> topicConfiguration(String topicNameOrPattern);

    ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
            java.util.function.Consumer<TopicConsumerBuilder<T>>
builderConsumer);
}
```

Create a new interface:

```java
public interface TopicConsumerBuilder<T> {
    TopicConsumerBuilder<T> priorityLevel(int priorityLevel);

    ConsumerBuilder<T> build();
}
```

In pulsar-client-original, update `ConsumerConfigurationData` to include a
new field:

```java
@Data
public class ConsumerConfigurationData<T> implements Serializable,
Cloneable {
    ...

    private List<TopicConsumerConfigurationData> topicConfigurations = new
ArrayList<>();
}
```

Create a topic configuration class:

```java
@Data
public class TopicConsumerConfigurationData implements Serializable {
    private static final long serialVersionUID = 1L;

    private String topicNameOrPattern;
    private int priorityLevel;

    public boolean matchesTopicName(String topicName) {
        return
Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
    }

    public static TopicConsumerConfigurationData of(String
topicNameOrPattern,
            ConsumerConfigurationData<?> conf) {
        return new TopicConsumerConfigurationData(topicNameOrPattern,
conf.getPriorityLevel());
    }
}
```

Then, in `ConsumerImpl` the appropriate topic configuration can be selected
based on the topic being subscribed to. Since the topic configuration is
effectively keyed by a topic name or pattern, it’s possible for the user to
be
able configure multiple topic configurations that match the same concrete
topic
name. In this case the first topic name match should be selected.

```java
TopicConsumerConfigurationData getMatchingTopicConfiguration(String
topicName,
        ConsumerConfigurationData conf) {
    return topicConfigurations.stream()
        .filter(topicConf -> topicConf.matchesTopicName(topicName))
        .findFirst()
        .orElseGet(() -> TopicConsumerConfigurationData.of(topicName,
conf));
}
```

Example Usage:

```java
pulsarClient.newConsumer()
    .topicsPattern("events.*")
    .priorityLevel(1)
    .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
    .subscribe();
```

or

```java
pulsarClient.newConsumer()
    .topicsPattern("events.*")
    .priorityLevel(1)
    .topicConfiguration(".*us-east-1")
        .priorityLevel(0)
        .build()
    .subscribe();
```

## Rejected Alternatives

* Extend the existing `ConsumerBuilder` rather than introducing a nested,
topic specific builder class.

Rejection reason: Does not provide a clear API to discover and extend other
topic specific configuration options and overrides.

```java
public interface ConsumerBuilder<T> extends Cloneable {
    ...

    ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int
priorityLevel);
}
```

Example usage:

```java
pulsarClient.newConsumer()
    .topicsPattern("events.*")
    .priorityLevel(1)
    .topicPriorityLevel(".*us-east-1", 0)
    .subscribe();
```

* Provide a configurator interface to configure options and overrides at
runtime

Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.

```java
@Data
class TopicConsumerConfigurationData {
    private int priorityLevel;
}
```

```java
interface TopicConsumerConfigurator extends Serializable {
   void configure(String topicName, TopicConsumerConfigurationData
topicConf);
}
```

```java
public interface ConsumerBuilder<T> extends Cloneable {
    ...

    ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
configurator);
}
```

Example usage:

```java
pulsarClient.newConsumer()
    .topicsPattern("events.*")
    .priorityLevel(1)
    .topicConfigurator((topicName, topicConf) -> {
        if (topicName.endsWith("us-east-1") {
            topicConf.setPriorityLevel(0);
        }
    })
    .subscribe();
```

Re: [DISCUSS] PIP-184: Topic specific consumer priorityLevel

Posted by PengHui Li <pe...@apache.org>.
+1

It allows users to only have one consumer but with different priority
levels for different topics.

Penghui

On Tue, Jul 12, 2022 at 4:18 PM Dave Maughan
<da...@streamnative.io.invalid> wrote:

> Hi Zike,
>
> > What about using the Pattern type to store the compiled Pattern in the
> TopicConsumerConfigurationData?
>
> Thanks for the suggestion - yes that's a good idea. I'll update the doc.
>
> - Dave
>
>
> On Mon, Jul 11, 2022 at 12:04 PM Zike Yang <zi...@apache.org> wrote:
>
> > Hi, Dave
> >
> > Thanks for your proposals. Overall looks good to me. Just a minor
> comment:
> >
> > What about using the Pattern type to store the compiled Pattern in the
> > TopicConsumerConfigurationData? Like below:
> > ```
> > public class TopicConsumerConfigurationData implements Serializable {
> >     private static final long serialVersionUID = 1L;
> >
> >     private Pattern topicsPattern;
> >     private int priorityLevel;
> > }
> > ```
> > Just like what we did for the topicsPattern in the
> > ConsumerConfigurationData. [0]
> >
> >
> > [0]
> >
> https://github.com/apache/pulsar/blob/bbf2a47867ff54327c1f2940e72f08a44a5dc5f7/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L58
> >
> > Zike Yang
> >
> >
> >
> > Zike Yang
> >
> >
> > On Sat, Jul 9, 2022 at 12:10 AM Dave Maughan
> > <da...@streamnative.io.invalid> wrote:
> > >
> > > Hi Enrico,
> > >
> > > Why can't you create multiple independent Consumers ?
> > > > They will share the same resources (memory pools, thread pools)
> > >
> > >
> > > You can do that but it means the user has to manage the consumption
> from
> > > each of the consumer instances manually. This is already solved in the
> > > existing multi topic consumer. This change adds user convenience - to
> > > continue consuming from multiple topics in a single consumer instance,
> > but
> > > allowing them to customise the configuration.
> > >
> > > - Dave
> > >
> > > On Fri, Jul 8, 2022 at 4:26 PM Enrico Olivelli <eo...@gmail.com>
> > wrote:
> > >
> > > > Dave,
> > > > Why can't you create multiple independent Consumers ?
> > > > They will share the same resources (memory pools, thread pools)
> > > >
> > > >
> > > >
> > > >
> > > > Enrico
> > > >
> > > > Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan
> > > > <da...@streamnative.io.invalid> ha scritto:
> > > > >
> > > > > Hi Pulsar community,
> > > > >
> > > > > I've opened a PIP to discuss a new Java client option to allow
> > setting
> > > > > topic specific consumer priorityLevel.
> > > > >
> > > > > Proposal Link: https://github.com/apache/pulsar/issues/16481
> > > > >
> > > > > ---
> > > > >
> > > > > ## Motivation
> > > > >
> > > > > The Pulsar Java consumer supports setting a priority level for
> > priority
> > > > > message
> > > > > dispatch in shared subscription consumers and priority assignment
> in
> > > > > failover
> > > > > subscription consumers. See the
> > [ConsumerBuilder.html#priorityLevel(int)
> > > > > Javadoc](
> > > > >
> > > >
> >
> https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
> > > > > )
> > > > > for a detailed functional description. The Pulsar Java consumer
> also
> > > > > supports
> > > > > consuming from multiple topics. However, it is not possible to set
> a
> > > > > different
> > > > > priority level for different topics in the same Consumer instance.
> > > > >
> > > > > This behaviour is desirable in some use cases. For example, a
> > consumer
> > > > > processing region specific topics might wish to configure region
> > > > stickiness
> > > > > - A
> > > > > multi-region application might be consuming from topics
> > events-us-east-1
> > > > and
> > > > > events-eu-west-1. Consumers in all regions should be configured to
> > > > consume
> > > > > all
> > > > > topics to ensure data completeness. However, to ensure low latency,
> > the
> > > > > us-east-1 consumer would need to set a higher priority level for
> the
> > > > > us-east-1
> > > > > topic. Similarly, the eu-west-1 consumer would need to set a higher
> > > > priority
> > > > > level for the eu-west-1 topic.
> > > > >
> > > > > ## Goal
> > > > >
> > > > > Update the Java client API to allow the configuration of different
> > > > priority
> > > > > levels for different topics.
> > > > >
> > > > > Do so in such a way that supports the addition of other topic
> > specific
> > > > > configuration options or overrides in the future.
> > > > >
> > > > > Issues will be created to track feature parity in the other client
> > > > > implementations for this PIP.
> > > > >
> > > > > ## API Changes
> > > > >
> > > > > In pulsar-client-api, update `ConsumerBuilder` to include two new
> > > > methods:
> > > > >
> > > > > ```java
> > > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > > >     ...
> > > > >
> > > > >     TopicConsumerBuilder<T> topicConfiguration(String
> > > > topicNameOrPattern);
> > > > >
> > > > >     ConsumerBuilder<T> topicConfiguration(String
> topicNameOrPattern,
> > > > >             java.util.function.Consumer<TopicConsumerBuilder<T>>
> > > > > builderConsumer);
> > > > > }
> > > > > ```
> > > > >
> > > > > Create a new interface:
> > > > >
> > > > > ```java
> > > > > public interface TopicConsumerBuilder<T> {
> > > > >     TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
> > > > >
> > > > >     ConsumerBuilder<T> build();
> > > > > }
> > > > > ```
> > > > >
> > > > > In pulsar-client-original, update `ConsumerConfigurationData` to
> > include
> > > > a
> > > > > new field:
> > > > >
> > > > > ```java
> > > > > @Data
> > > > > public class ConsumerConfigurationData<T> implements Serializable,
> > > > > Cloneable {
> > > > >     ...
> > > > >
> > > > >     private List<TopicConsumerConfigurationData>
> topicConfigurations
> > =
> > > > new
> > > > > ArrayList<>();
> > > > > }
> > > > > ```
> > > > >
> > > > > Create a topic configuration class:
> > > > >
> > > > > ```java
> > > > > @Data
> > > > > public class TopicConsumerConfigurationData implements
> Serializable {
> > > > >     private static final long serialVersionUID = 1L;
> > > > >
> > > > >     private String topicNameOrPattern;
> > > > >     private int priorityLevel;
> > > > >
> > > > >     public boolean matchesTopicName(String topicName) {
> > > > >         return
> > > > > Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
> > > > >     }
> > > > >
> > > > >     public static TopicConsumerConfigurationData of(String
> > > > > topicNameOrPattern,
> > > > >             ConsumerConfigurationData<?> conf) {
> > > > >         return new
> TopicConsumerConfigurationData(topicNameOrPattern,
> > > > > conf.getPriorityLevel());
> > > > >     }
> > > > > }
> > > > > ```
> > > > >
> > > > > Then, in `ConsumerImpl` the appropriate topic configuration can be
> > > > selected
> > > > > based on the topic being subscribed to. Since the topic
> > configuration is
> > > > > effectively keyed by a topic name or pattern, it’s possible for the
> > user
> > > > to
> > > > > be
> > > > > able configure multiple topic configurations that match the same
> > concrete
> > > > > topic
> > > > > name. In this case the first topic name match should be selected.
> > > > >
> > > > > ```java
> > > > > TopicConsumerConfigurationData getMatchingTopicConfiguration(String
> > > > > topicName,
> > > > >         ConsumerConfigurationData conf) {
> > > > >     return topicConfigurations.stream()
> > > > >         .filter(topicConf -> topicConf.matchesTopicName(topicName))
> > > > >         .findFirst()
> > > > >         .orElseGet(() ->
> TopicConsumerConfigurationData.of(topicName,
> > > > > conf));
> > > > > }
> > > > > ```
> > > > >
> > > > > Example Usage:
> > > > >
> > > > > ```java
> > > > > pulsarClient.newConsumer()
> > > > >     .topicsPattern("events.*")
> > > > >     .priorityLevel(1)
> > > > >     .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
> > > > >     .subscribe();
> > > > > ```
> > > > >
> > > > > or
> > > > >
> > > > > ```java
> > > > > pulsarClient.newConsumer()
> > > > >     .topicsPattern("events.*")
> > > > >     .priorityLevel(1)
> > > > >     .topicConfiguration(".*us-east-1")
> > > > >         .priorityLevel(0)
> > > > >         .build()
> > > > >     .subscribe();
> > > > > ```
> > > > >
> > > > > ## Rejected Alternatives
> > > > >
> > > > > * Extend the existing `ConsumerBuilder` rather than introducing a
> > nested,
> > > > > topic specific builder class.
> > > > >
> > > > > Rejection reason: Does not provide a clear API to discover and
> extend
> > > > other
> > > > > topic specific configuration options and overrides.
> > > > >
> > > > > ```java
> > > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > > >     ...
> > > > >
> > > > >     ConsumerBuilder<T> topicPriorityLevel(String
> topicNameOrPattern,
> > int
> > > > > priorityLevel);
> > > > > }
> > > > > ```
> > > > >
> > > > > Example usage:
> > > > >
> > > > > ```java
> > > > > pulsarClient.newConsumer()
> > > > >     .topicsPattern("events.*")
> > > > >     .priorityLevel(1)
> > > > >     .topicPriorityLevel(".*us-east-1", 0)
> > > > >     .subscribe();
> > > > > ```
> > > > >
> > > > > * Provide a configurator interface to configure options and
> > overrides at
> > > > > runtime
> > > > >
> > > > > Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
> > > > >
> > > > > ```java
> > > > > @Data
> > > > > class TopicConsumerConfigurationData {
> > > > >     private int priorityLevel;
> > > > > }
> > > > > ```
> > > > >
> > > > > ```java
> > > > > interface TopicConsumerConfigurator extends Serializable {
> > > > >    void configure(String topicName, TopicConsumerConfigurationData
> > > > > topicConf);
> > > > > }
> > > > > ```
> > > > >
> > > > > ```java
> > > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > > >     ...
> > > > >
> > > > >     ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
> > > > > configurator);
> > > > > }
> > > > > ```
> > > > >
> > > > > Example usage:
> > > > >
> > > > > ```java
> > > > > pulsarClient.newConsumer()
> > > > >     .topicsPattern("events.*")
> > > > >     .priorityLevel(1)
> > > > >     .topicConfigurator((topicName, topicConf) -> {
> > > > >         if (topicName.endsWith("us-east-1") {
> > > > >             topicConf.setPriorityLevel(0);
> > > > >         }
> > > > >     })
> > > > >     .subscribe();
> > > > > ```
> > > >
> >
>

Re: [DISCUSS] PIP-184: Topic specific consumer priorityLevel

Posted by Dave Maughan <da...@streamnative.io.INVALID>.
Hi Zike,

> What about using the Pattern type to store the compiled Pattern in the
TopicConsumerConfigurationData?

Thanks for the suggestion - yes that's a good idea. I'll update the doc.

- Dave


On Mon, Jul 11, 2022 at 12:04 PM Zike Yang <zi...@apache.org> wrote:

> Hi, Dave
>
> Thanks for your proposals. Overall looks good to me. Just a minor comment:
>
> What about using the Pattern type to store the compiled Pattern in the
> TopicConsumerConfigurationData? Like below:
> ```
> public class TopicConsumerConfigurationData implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     private Pattern topicsPattern;
>     private int priorityLevel;
> }
> ```
> Just like what we did for the topicsPattern in the
> ConsumerConfigurationData. [0]
>
>
> [0]
> https://github.com/apache/pulsar/blob/bbf2a47867ff54327c1f2940e72f08a44a5dc5f7/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L58
>
> Zike Yang
>
>
>
> Zike Yang
>
>
> On Sat, Jul 9, 2022 at 12:10 AM Dave Maughan
> <da...@streamnative.io.invalid> wrote:
> >
> > Hi Enrico,
> >
> > Why can't you create multiple independent Consumers ?
> > > They will share the same resources (memory pools, thread pools)
> >
> >
> > You can do that but it means the user has to manage the consumption from
> > each of the consumer instances manually. This is already solved in the
> > existing multi topic consumer. This change adds user convenience - to
> > continue consuming from multiple topics in a single consumer instance,
> but
> > allowing them to customise the configuration.
> >
> > - Dave
> >
> > On Fri, Jul 8, 2022 at 4:26 PM Enrico Olivelli <eo...@gmail.com>
> wrote:
> >
> > > Dave,
> > > Why can't you create multiple independent Consumers ?
> > > They will share the same resources (memory pools, thread pools)
> > >
> > >
> > >
> > >
> > > Enrico
> > >
> > > Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan
> > > <da...@streamnative.io.invalid> ha scritto:
> > > >
> > > > Hi Pulsar community,
> > > >
> > > > I've opened a PIP to discuss a new Java client option to allow
> setting
> > > > topic specific consumer priorityLevel.
> > > >
> > > > Proposal Link: https://github.com/apache/pulsar/issues/16481
> > > >
> > > > ---
> > > >
> > > > ## Motivation
> > > >
> > > > The Pulsar Java consumer supports setting a priority level for
> priority
> > > > message
> > > > dispatch in shared subscription consumers and priority assignment in
> > > > failover
> > > > subscription consumers. See the
> [ConsumerBuilder.html#priorityLevel(int)
> > > > Javadoc](
> > > >
> > >
> https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
> > > > )
> > > > for a detailed functional description. The Pulsar Java consumer also
> > > > supports
> > > > consuming from multiple topics. However, it is not possible to set a
> > > > different
> > > > priority level for different topics in the same Consumer instance.
> > > >
> > > > This behaviour is desirable in some use cases. For example, a
> consumer
> > > > processing region specific topics might wish to configure region
> > > stickiness
> > > > - A
> > > > multi-region application might be consuming from topics
> events-us-east-1
> > > and
> > > > events-eu-west-1. Consumers in all regions should be configured to
> > > consume
> > > > all
> > > > topics to ensure data completeness. However, to ensure low latency,
> the
> > > > us-east-1 consumer would need to set a higher priority level for the
> > > > us-east-1
> > > > topic. Similarly, the eu-west-1 consumer would need to set a higher
> > > priority
> > > > level for the eu-west-1 topic.
> > > >
> > > > ## Goal
> > > >
> > > > Update the Java client API to allow the configuration of different
> > > priority
> > > > levels for different topics.
> > > >
> > > > Do so in such a way that supports the addition of other topic
> specific
> > > > configuration options or overrides in the future.
> > > >
> > > > Issues will be created to track feature parity in the other client
> > > > implementations for this PIP.
> > > >
> > > > ## API Changes
> > > >
> > > > In pulsar-client-api, update `ConsumerBuilder` to include two new
> > > methods:
> > > >
> > > > ```java
> > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > >     ...
> > > >
> > > >     TopicConsumerBuilder<T> topicConfiguration(String
> > > topicNameOrPattern);
> > > >
> > > >     ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
> > > >             java.util.function.Consumer<TopicConsumerBuilder<T>>
> > > > builderConsumer);
> > > > }
> > > > ```
> > > >
> > > > Create a new interface:
> > > >
> > > > ```java
> > > > public interface TopicConsumerBuilder<T> {
> > > >     TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
> > > >
> > > >     ConsumerBuilder<T> build();
> > > > }
> > > > ```
> > > >
> > > > In pulsar-client-original, update `ConsumerConfigurationData` to
> include
> > > a
> > > > new field:
> > > >
> > > > ```java
> > > > @Data
> > > > public class ConsumerConfigurationData<T> implements Serializable,
> > > > Cloneable {
> > > >     ...
> > > >
> > > >     private List<TopicConsumerConfigurationData> topicConfigurations
> =
> > > new
> > > > ArrayList<>();
> > > > }
> > > > ```
> > > >
> > > > Create a topic configuration class:
> > > >
> > > > ```java
> > > > @Data
> > > > public class TopicConsumerConfigurationData implements Serializable {
> > > >     private static final long serialVersionUID = 1L;
> > > >
> > > >     private String topicNameOrPattern;
> > > >     private int priorityLevel;
> > > >
> > > >     public boolean matchesTopicName(String topicName) {
> > > >         return
> > > > Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
> > > >     }
> > > >
> > > >     public static TopicConsumerConfigurationData of(String
> > > > topicNameOrPattern,
> > > >             ConsumerConfigurationData<?> conf) {
> > > >         return new TopicConsumerConfigurationData(topicNameOrPattern,
> > > > conf.getPriorityLevel());
> > > >     }
> > > > }
> > > > ```
> > > >
> > > > Then, in `ConsumerImpl` the appropriate topic configuration can be
> > > selected
> > > > based on the topic being subscribed to. Since the topic
> configuration is
> > > > effectively keyed by a topic name or pattern, it’s possible for the
> user
> > > to
> > > > be
> > > > able configure multiple topic configurations that match the same
> concrete
> > > > topic
> > > > name. In this case the first topic name match should be selected.
> > > >
> > > > ```java
> > > > TopicConsumerConfigurationData getMatchingTopicConfiguration(String
> > > > topicName,
> > > >         ConsumerConfigurationData conf) {
> > > >     return topicConfigurations.stream()
> > > >         .filter(topicConf -> topicConf.matchesTopicName(topicName))
> > > >         .findFirst()
> > > >         .orElseGet(() -> TopicConsumerConfigurationData.of(topicName,
> > > > conf));
> > > > }
> > > > ```
> > > >
> > > > Example Usage:
> > > >
> > > > ```java
> > > > pulsarClient.newConsumer()
> > > >     .topicsPattern("events.*")
> > > >     .priorityLevel(1)
> > > >     .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
> > > >     .subscribe();
> > > > ```
> > > >
> > > > or
> > > >
> > > > ```java
> > > > pulsarClient.newConsumer()
> > > >     .topicsPattern("events.*")
> > > >     .priorityLevel(1)
> > > >     .topicConfiguration(".*us-east-1")
> > > >         .priorityLevel(0)
> > > >         .build()
> > > >     .subscribe();
> > > > ```
> > > >
> > > > ## Rejected Alternatives
> > > >
> > > > * Extend the existing `ConsumerBuilder` rather than introducing a
> nested,
> > > > topic specific builder class.
> > > >
> > > > Rejection reason: Does not provide a clear API to discover and extend
> > > other
> > > > topic specific configuration options and overrides.
> > > >
> > > > ```java
> > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > >     ...
> > > >
> > > >     ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern,
> int
> > > > priorityLevel);
> > > > }
> > > > ```
> > > >
> > > > Example usage:
> > > >
> > > > ```java
> > > > pulsarClient.newConsumer()
> > > >     .topicsPattern("events.*")
> > > >     .priorityLevel(1)
> > > >     .topicPriorityLevel(".*us-east-1", 0)
> > > >     .subscribe();
> > > > ```
> > > >
> > > > * Provide a configurator interface to configure options and
> overrides at
> > > > runtime
> > > >
> > > > Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
> > > >
> > > > ```java
> > > > @Data
> > > > class TopicConsumerConfigurationData {
> > > >     private int priorityLevel;
> > > > }
> > > > ```
> > > >
> > > > ```java
> > > > interface TopicConsumerConfigurator extends Serializable {
> > > >    void configure(String topicName, TopicConsumerConfigurationData
> > > > topicConf);
> > > > }
> > > > ```
> > > >
> > > > ```java
> > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > >     ...
> > > >
> > > >     ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
> > > > configurator);
> > > > }
> > > > ```
> > > >
> > > > Example usage:
> > > >
> > > > ```java
> > > > pulsarClient.newConsumer()
> > > >     .topicsPattern("events.*")
> > > >     .priorityLevel(1)
> > > >     .topicConfigurator((topicName, topicConf) -> {
> > > >         if (topicName.endsWith("us-east-1") {
> > > >             topicConf.setPriorityLevel(0);
> > > >         }
> > > >     })
> > > >     .subscribe();
> > > > ```
> > >
>

Re: [DISCUSS] PIP-184: Topic specific consumer priorityLevel

Posted by Zike Yang <zi...@apache.org>.
Hi, Dave

Thanks for your proposals. Overall looks good to me. Just a minor comment:

What about using the Pattern type to store the compiled Pattern in the
TopicConsumerConfigurationData? Like below:
```
public class TopicConsumerConfigurationData implements Serializable {
    private static final long serialVersionUID = 1L;

    private Pattern topicsPattern;
    private int priorityLevel;
}
```
Just like what we did for the topicsPattern in the
ConsumerConfigurationData. [0]


[0] https://github.com/apache/pulsar/blob/bbf2a47867ff54327c1f2940e72f08a44a5dc5f7/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L58

Zike Yang



Zike Yang


On Sat, Jul 9, 2022 at 12:10 AM Dave Maughan
<da...@streamnative.io.invalid> wrote:
>
> Hi Enrico,
>
> Why can't you create multiple independent Consumers ?
> > They will share the same resources (memory pools, thread pools)
>
>
> You can do that but it means the user has to manage the consumption from
> each of the consumer instances manually. This is already solved in the
> existing multi topic consumer. This change adds user convenience - to
> continue consuming from multiple topics in a single consumer instance, but
> allowing them to customise the configuration.
>
> - Dave
>
> On Fri, Jul 8, 2022 at 4:26 PM Enrico Olivelli <eo...@gmail.com> wrote:
>
> > Dave,
> > Why can't you create multiple independent Consumers ?
> > They will share the same resources (memory pools, thread pools)
> >
> >
> >
> >
> > Enrico
> >
> > Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan
> > <da...@streamnative.io.invalid> ha scritto:
> > >
> > > Hi Pulsar community,
> > >
> > > I've opened a PIP to discuss a new Java client option to allow setting
> > > topic specific consumer priorityLevel.
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/16481
> > >
> > > ---
> > >
> > > ## Motivation
> > >
> > > The Pulsar Java consumer supports setting a priority level for priority
> > > message
> > > dispatch in shared subscription consumers and priority assignment in
> > > failover
> > > subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int)
> > > Javadoc](
> > >
> > https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
> > > )
> > > for a detailed functional description. The Pulsar Java consumer also
> > > supports
> > > consuming from multiple topics. However, it is not possible to set a
> > > different
> > > priority level for different topics in the same Consumer instance.
> > >
> > > This behaviour is desirable in some use cases. For example, a consumer
> > > processing region specific topics might wish to configure region
> > stickiness
> > > - A
> > > multi-region application might be consuming from topics events-us-east-1
> > and
> > > events-eu-west-1. Consumers in all regions should be configured to
> > consume
> > > all
> > > topics to ensure data completeness. However, to ensure low latency, the
> > > us-east-1 consumer would need to set a higher priority level for the
> > > us-east-1
> > > topic. Similarly, the eu-west-1 consumer would need to set a higher
> > priority
> > > level for the eu-west-1 topic.
> > >
> > > ## Goal
> > >
> > > Update the Java client API to allow the configuration of different
> > priority
> > > levels for different topics.
> > >
> > > Do so in such a way that supports the addition of other topic specific
> > > configuration options or overrides in the future.
> > >
> > > Issues will be created to track feature parity in the other client
> > > implementations for this PIP.
> > >
> > > ## API Changes
> > >
> > > In pulsar-client-api, update `ConsumerBuilder` to include two new
> > methods:
> > >
> > > ```java
> > > public interface ConsumerBuilder<T> extends Cloneable {
> > >     ...
> > >
> > >     TopicConsumerBuilder<T> topicConfiguration(String
> > topicNameOrPattern);
> > >
> > >     ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
> > >             java.util.function.Consumer<TopicConsumerBuilder<T>>
> > > builderConsumer);
> > > }
> > > ```
> > >
> > > Create a new interface:
> > >
> > > ```java
> > > public interface TopicConsumerBuilder<T> {
> > >     TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
> > >
> > >     ConsumerBuilder<T> build();
> > > }
> > > ```
> > >
> > > In pulsar-client-original, update `ConsumerConfigurationData` to include
> > a
> > > new field:
> > >
> > > ```java
> > > @Data
> > > public class ConsumerConfigurationData<T> implements Serializable,
> > > Cloneable {
> > >     ...
> > >
> > >     private List<TopicConsumerConfigurationData> topicConfigurations =
> > new
> > > ArrayList<>();
> > > }
> > > ```
> > >
> > > Create a topic configuration class:
> > >
> > > ```java
> > > @Data
> > > public class TopicConsumerConfigurationData implements Serializable {
> > >     private static final long serialVersionUID = 1L;
> > >
> > >     private String topicNameOrPattern;
> > >     private int priorityLevel;
> > >
> > >     public boolean matchesTopicName(String topicName) {
> > >         return
> > > Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
> > >     }
> > >
> > >     public static TopicConsumerConfigurationData of(String
> > > topicNameOrPattern,
> > >             ConsumerConfigurationData<?> conf) {
> > >         return new TopicConsumerConfigurationData(topicNameOrPattern,
> > > conf.getPriorityLevel());
> > >     }
> > > }
> > > ```
> > >
> > > Then, in `ConsumerImpl` the appropriate topic configuration can be
> > selected
> > > based on the topic being subscribed to. Since the topic configuration is
> > > effectively keyed by a topic name or pattern, it’s possible for the user
> > to
> > > be
> > > able configure multiple topic configurations that match the same concrete
> > > topic
> > > name. In this case the first topic name match should be selected.
> > >
> > > ```java
> > > TopicConsumerConfigurationData getMatchingTopicConfiguration(String
> > > topicName,
> > >         ConsumerConfigurationData conf) {
> > >     return topicConfigurations.stream()
> > >         .filter(topicConf -> topicConf.matchesTopicName(topicName))
> > >         .findFirst()
> > >         .orElseGet(() -> TopicConsumerConfigurationData.of(topicName,
> > > conf));
> > > }
> > > ```
> > >
> > > Example Usage:
> > >
> > > ```java
> > > pulsarClient.newConsumer()
> > >     .topicsPattern("events.*")
> > >     .priorityLevel(1)
> > >     .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
> > >     .subscribe();
> > > ```
> > >
> > > or
> > >
> > > ```java
> > > pulsarClient.newConsumer()
> > >     .topicsPattern("events.*")
> > >     .priorityLevel(1)
> > >     .topicConfiguration(".*us-east-1")
> > >         .priorityLevel(0)
> > >         .build()
> > >     .subscribe();
> > > ```
> > >
> > > ## Rejected Alternatives
> > >
> > > * Extend the existing `ConsumerBuilder` rather than introducing a nested,
> > > topic specific builder class.
> > >
> > > Rejection reason: Does not provide a clear API to discover and extend
> > other
> > > topic specific configuration options and overrides.
> > >
> > > ```java
> > > public interface ConsumerBuilder<T> extends Cloneable {
> > >     ...
> > >
> > >     ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int
> > > priorityLevel);
> > > }
> > > ```
> > >
> > > Example usage:
> > >
> > > ```java
> > > pulsarClient.newConsumer()
> > >     .topicsPattern("events.*")
> > >     .priorityLevel(1)
> > >     .topicPriorityLevel(".*us-east-1", 0)
> > >     .subscribe();
> > > ```
> > >
> > > * Provide a configurator interface to configure options and overrides at
> > > runtime
> > >
> > > Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
> > >
> > > ```java
> > > @Data
> > > class TopicConsumerConfigurationData {
> > >     private int priorityLevel;
> > > }
> > > ```
> > >
> > > ```java
> > > interface TopicConsumerConfigurator extends Serializable {
> > >    void configure(String topicName, TopicConsumerConfigurationData
> > > topicConf);
> > > }
> > > ```
> > >
> > > ```java
> > > public interface ConsumerBuilder<T> extends Cloneable {
> > >     ...
> > >
> > >     ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
> > > configurator);
> > > }
> > > ```
> > >
> > > Example usage:
> > >
> > > ```java
> > > pulsarClient.newConsumer()
> > >     .topicsPattern("events.*")
> > >     .priorityLevel(1)
> > >     .topicConfigurator((topicName, topicConf) -> {
> > >         if (topicName.endsWith("us-east-1") {
> > >             topicConf.setPriorityLevel(0);
> > >         }
> > >     })
> > >     .subscribe();
> > > ```
> >

Re: [DISCUSS] PIP-184: Topic specific consumer priorityLevel

Posted by Dave Maughan <da...@streamnative.io.INVALID>.
Hi Enrico,

Why can't you create multiple independent Consumers ?
> They will share the same resources (memory pools, thread pools)


You can do that but it means the user has to manage the consumption from
each of the consumer instances manually. This is already solved in the
existing multi topic consumer. This change adds user convenience - to
continue consuming from multiple topics in a single consumer instance, but
allowing them to customise the configuration.

- Dave

On Fri, Jul 8, 2022 at 4:26 PM Enrico Olivelli <eo...@gmail.com> wrote:

> Dave,
> Why can't you create multiple independent Consumers ?
> They will share the same resources (memory pools, thread pools)
>
>
>
>
> Enrico
>
> Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan
> <da...@streamnative.io.invalid> ha scritto:
> >
> > Hi Pulsar community,
> >
> > I've opened a PIP to discuss a new Java client option to allow setting
> > topic specific consumer priorityLevel.
> >
> > Proposal Link: https://github.com/apache/pulsar/issues/16481
> >
> > ---
> >
> > ## Motivation
> >
> > The Pulsar Java consumer supports setting a priority level for priority
> > message
> > dispatch in shared subscription consumers and priority assignment in
> > failover
> > subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int)
> > Javadoc](
> >
> https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
> > )
> > for a detailed functional description. The Pulsar Java consumer also
> > supports
> > consuming from multiple topics. However, it is not possible to set a
> > different
> > priority level for different topics in the same Consumer instance.
> >
> > This behaviour is desirable in some use cases. For example, a consumer
> > processing region specific topics might wish to configure region
> stickiness
> > - A
> > multi-region application might be consuming from topics events-us-east-1
> and
> > events-eu-west-1. Consumers in all regions should be configured to
> consume
> > all
> > topics to ensure data completeness. However, to ensure low latency, the
> > us-east-1 consumer would need to set a higher priority level for the
> > us-east-1
> > topic. Similarly, the eu-west-1 consumer would need to set a higher
> priority
> > level for the eu-west-1 topic.
> >
> > ## Goal
> >
> > Update the Java client API to allow the configuration of different
> priority
> > levels for different topics.
> >
> > Do so in such a way that supports the addition of other topic specific
> > configuration options or overrides in the future.
> >
> > Issues will be created to track feature parity in the other client
> > implementations for this PIP.
> >
> > ## API Changes
> >
> > In pulsar-client-api, update `ConsumerBuilder` to include two new
> methods:
> >
> > ```java
> > public interface ConsumerBuilder<T> extends Cloneable {
> >     ...
> >
> >     TopicConsumerBuilder<T> topicConfiguration(String
> topicNameOrPattern);
> >
> >     ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
> >             java.util.function.Consumer<TopicConsumerBuilder<T>>
> > builderConsumer);
> > }
> > ```
> >
> > Create a new interface:
> >
> > ```java
> > public interface TopicConsumerBuilder<T> {
> >     TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
> >
> >     ConsumerBuilder<T> build();
> > }
> > ```
> >
> > In pulsar-client-original, update `ConsumerConfigurationData` to include
> a
> > new field:
> >
> > ```java
> > @Data
> > public class ConsumerConfigurationData<T> implements Serializable,
> > Cloneable {
> >     ...
> >
> >     private List<TopicConsumerConfigurationData> topicConfigurations =
> new
> > ArrayList<>();
> > }
> > ```
> >
> > Create a topic configuration class:
> >
> > ```java
> > @Data
> > public class TopicConsumerConfigurationData implements Serializable {
> >     private static final long serialVersionUID = 1L;
> >
> >     private String topicNameOrPattern;
> >     private int priorityLevel;
> >
> >     public boolean matchesTopicName(String topicName) {
> >         return
> > Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
> >     }
> >
> >     public static TopicConsumerConfigurationData of(String
> > topicNameOrPattern,
> >             ConsumerConfigurationData<?> conf) {
> >         return new TopicConsumerConfigurationData(topicNameOrPattern,
> > conf.getPriorityLevel());
> >     }
> > }
> > ```
> >
> > Then, in `ConsumerImpl` the appropriate topic configuration can be
> selected
> > based on the topic being subscribed to. Since the topic configuration is
> > effectively keyed by a topic name or pattern, it’s possible for the user
> to
> > be
> > able configure multiple topic configurations that match the same concrete
> > topic
> > name. In this case the first topic name match should be selected.
> >
> > ```java
> > TopicConsumerConfigurationData getMatchingTopicConfiguration(String
> > topicName,
> >         ConsumerConfigurationData conf) {
> >     return topicConfigurations.stream()
> >         .filter(topicConf -> topicConf.matchesTopicName(topicName))
> >         .findFirst()
> >         .orElseGet(() -> TopicConsumerConfigurationData.of(topicName,
> > conf));
> > }
> > ```
> >
> > Example Usage:
> >
> > ```java
> > pulsarClient.newConsumer()
> >     .topicsPattern("events.*")
> >     .priorityLevel(1)
> >     .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
> >     .subscribe();
> > ```
> >
> > or
> >
> > ```java
> > pulsarClient.newConsumer()
> >     .topicsPattern("events.*")
> >     .priorityLevel(1)
> >     .topicConfiguration(".*us-east-1")
> >         .priorityLevel(0)
> >         .build()
> >     .subscribe();
> > ```
> >
> > ## Rejected Alternatives
> >
> > * Extend the existing `ConsumerBuilder` rather than introducing a nested,
> > topic specific builder class.
> >
> > Rejection reason: Does not provide a clear API to discover and extend
> other
> > topic specific configuration options and overrides.
> >
> > ```java
> > public interface ConsumerBuilder<T> extends Cloneable {
> >     ...
> >
> >     ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int
> > priorityLevel);
> > }
> > ```
> >
> > Example usage:
> >
> > ```java
> > pulsarClient.newConsumer()
> >     .topicsPattern("events.*")
> >     .priorityLevel(1)
> >     .topicPriorityLevel(".*us-east-1", 0)
> >     .subscribe();
> > ```
> >
> > * Provide a configurator interface to configure options and overrides at
> > runtime
> >
> > Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
> >
> > ```java
> > @Data
> > class TopicConsumerConfigurationData {
> >     private int priorityLevel;
> > }
> > ```
> >
> > ```java
> > interface TopicConsumerConfigurator extends Serializable {
> >    void configure(String topicName, TopicConsumerConfigurationData
> > topicConf);
> > }
> > ```
> >
> > ```java
> > public interface ConsumerBuilder<T> extends Cloneable {
> >     ...
> >
> >     ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
> > configurator);
> > }
> > ```
> >
> > Example usage:
> >
> > ```java
> > pulsarClient.newConsumer()
> >     .topicsPattern("events.*")
> >     .priorityLevel(1)
> >     .topicConfigurator((topicName, topicConf) -> {
> >         if (topicName.endsWith("us-east-1") {
> >             topicConf.setPriorityLevel(0);
> >         }
> >     })
> >     .subscribe();
> > ```
>

Re: [DISCUSS] PIP-184: Topic specific consumer priorityLevel

Posted by Enrico Olivelli <eo...@gmail.com>.
Dave,
Why can't you create multiple independent Consumers ?
They will share the same resources (memory pools, thread pools)




Enrico

Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan
<da...@streamnative.io.invalid> ha scritto:
>
> Hi Pulsar community,
>
> I've opened a PIP to discuss a new Java client option to allow setting
> topic specific consumer priorityLevel.
>
> Proposal Link: https://github.com/apache/pulsar/issues/16481
>
> ---
>
> ## Motivation
>
> The Pulsar Java consumer supports setting a priority level for priority
> message
> dispatch in shared subscription consumers and priority assignment in
> failover
> subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int)
> Javadoc](
> https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
> )
> for a detailed functional description. The Pulsar Java consumer also
> supports
> consuming from multiple topics. However, it is not possible to set a
> different
> priority level for different topics in the same Consumer instance.
>
> This behaviour is desirable in some use cases. For example, a consumer
> processing region specific topics might wish to configure region stickiness
> - A
> multi-region application might be consuming from topics events-us-east-1 and
> events-eu-west-1. Consumers in all regions should be configured to consume
> all
> topics to ensure data completeness. However, to ensure low latency, the
> us-east-1 consumer would need to set a higher priority level for the
> us-east-1
> topic. Similarly, the eu-west-1 consumer would need to set a higher priority
> level for the eu-west-1 topic.
>
> ## Goal
>
> Update the Java client API to allow the configuration of different priority
> levels for different topics.
>
> Do so in such a way that supports the addition of other topic specific
> configuration options or overrides in the future.
>
> Issues will be created to track feature parity in the other client
> implementations for this PIP.
>
> ## API Changes
>
> In pulsar-client-api, update `ConsumerBuilder` to include two new methods:
>
> ```java
> public interface ConsumerBuilder<T> extends Cloneable {
>     ...
>
>     TopicConsumerBuilder<T> topicConfiguration(String topicNameOrPattern);
>
>     ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
>             java.util.function.Consumer<TopicConsumerBuilder<T>>
> builderConsumer);
> }
> ```
>
> Create a new interface:
>
> ```java
> public interface TopicConsumerBuilder<T> {
>     TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
>
>     ConsumerBuilder<T> build();
> }
> ```
>
> In pulsar-client-original, update `ConsumerConfigurationData` to include a
> new field:
>
> ```java
> @Data
> public class ConsumerConfigurationData<T> implements Serializable,
> Cloneable {
>     ...
>
>     private List<TopicConsumerConfigurationData> topicConfigurations = new
> ArrayList<>();
> }
> ```
>
> Create a topic configuration class:
>
> ```java
> @Data
> public class TopicConsumerConfigurationData implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     private String topicNameOrPattern;
>     private int priorityLevel;
>
>     public boolean matchesTopicName(String topicName) {
>         return
> Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
>     }
>
>     public static TopicConsumerConfigurationData of(String
> topicNameOrPattern,
>             ConsumerConfigurationData<?> conf) {
>         return new TopicConsumerConfigurationData(topicNameOrPattern,
> conf.getPriorityLevel());
>     }
> }
> ```
>
> Then, in `ConsumerImpl` the appropriate topic configuration can be selected
> based on the topic being subscribed to. Since the topic configuration is
> effectively keyed by a topic name or pattern, it’s possible for the user to
> be
> able configure multiple topic configurations that match the same concrete
> topic
> name. In this case the first topic name match should be selected.
>
> ```java
> TopicConsumerConfigurationData getMatchingTopicConfiguration(String
> topicName,
>         ConsumerConfigurationData conf) {
>     return topicConfigurations.stream()
>         .filter(topicConf -> topicConf.matchesTopicName(topicName))
>         .findFirst()
>         .orElseGet(() -> TopicConsumerConfigurationData.of(topicName,
> conf));
> }
> ```
>
> Example Usage:
>
> ```java
> pulsarClient.newConsumer()
>     .topicsPattern("events.*")
>     .priorityLevel(1)
>     .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
>     .subscribe();
> ```
>
> or
>
> ```java
> pulsarClient.newConsumer()
>     .topicsPattern("events.*")
>     .priorityLevel(1)
>     .topicConfiguration(".*us-east-1")
>         .priorityLevel(0)
>         .build()
>     .subscribe();
> ```
>
> ## Rejected Alternatives
>
> * Extend the existing `ConsumerBuilder` rather than introducing a nested,
> topic specific builder class.
>
> Rejection reason: Does not provide a clear API to discover and extend other
> topic specific configuration options and overrides.
>
> ```java
> public interface ConsumerBuilder<T> extends Cloneable {
>     ...
>
>     ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int
> priorityLevel);
> }
> ```
>
> Example usage:
>
> ```java
> pulsarClient.newConsumer()
>     .topicsPattern("events.*")
>     .priorityLevel(1)
>     .topicPriorityLevel(".*us-east-1", 0)
>     .subscribe();
> ```
>
> * Provide a configurator interface to configure options and overrides at
> runtime
>
> Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
>
> ```java
> @Data
> class TopicConsumerConfigurationData {
>     private int priorityLevel;
> }
> ```
>
> ```java
> interface TopicConsumerConfigurator extends Serializable {
>    void configure(String topicName, TopicConsumerConfigurationData
> topicConf);
> }
> ```
>
> ```java
> public interface ConsumerBuilder<T> extends Cloneable {
>     ...
>
>     ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
> configurator);
> }
> ```
>
> Example usage:
>
> ```java
> pulsarClient.newConsumer()
>     .topicsPattern("events.*")
>     .priorityLevel(1)
>     .topicConfigurator((topicName, topicConf) -> {
>         if (topicName.endsWith("us-east-1") {
>             topicConf.setPriorityLevel(0);
>         }
>     })
>     .subscribe();
> ```