You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/08 12:58:05 UTC

[GitHub] [pulsar] nahguam opened a new issue, #16481: PIP-184: Topic specific consumer priorityLevel

nahguam opened a new issue, #16481:
URL: https://github.com/apache/pulsar/issues/16481

   ## Motivation
   
   The Pulsar consumer supports setting a priority level for priority message
   dispatch in shared subscription consumers and priority assignment in failover
   subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int)
   Javadoc](https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int))
   for a detailed functional description. The Pulsar Java consumer also supports
   consuming from multiple topics. However, it is not possible to set a different
   priority level for different topics in the same Consumer instance.
   
   This behaviour is desirable in some use cases. For example, a consumer
   processing region specific topics might wish to configure region stickiness - A
   multi-region application might be consuming from topics events-us-east-1 and
   events-eu-west-1. Consumers in all regions should be configured to consume all
   topics to ensure data completeness. However, to ensure low latency, the
   us-east-1 consumer would need to set a higher priority level for the us-east-1
   topic. Similarly, the eu-west-1 consumer would need to set a higher priority
   level for the eu-west-1 topic.
   
   ## Goal
   
   Update the Java client API to allow the configuration of different priority
   levels for different topics.
   
   Do so in such a way that supports the addition of other topic specific
   configuration options or overrides in the future.
   
   Issues will be created to track feature parity in the other client
   implementations for this PIP.
   
   ## API Changes
   
   In pulsar-client-api, update `ConsumerBuilder` to include two new methods:
   
   ```java
   public interface ConsumerBuilder<T> extends Cloneable {
       ...
   
       TopicConsumerBuilder<T> topicConfiguration(String topicNameOrPattern);
       
       ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern, 
               java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);
   }
   ```
   
   Create a new interface:
   
   ```java
   public interface TopicConsumerBuilder<T> {
       TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
   
       ConsumerBuilder<T> build();
   }
   ```
   
   In pulsar-client-original, update `ConsumerConfigurationData` to include a new field:
   
   ```java
   @Data
   public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
       ...
   
       private List<TopicConsumerConfigurationData> topicConfigurations = new ArrayList<>();
   }
   ```
   
   Create a topic configuration class:
   
   ```java
   @Data
   public class TopicConsumerConfigurationData implements Serializable {
       private static final long serialVersionUID = 1L;
   
       private String topicNameOrPattern;
       private int priorityLevel;
   
       public boolean matchesTopicName(String topicName) {
           return Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
       }
   
       public static TopicConsumerConfigurationData of(String topicNameOrPattern, 
               ConsumerConfigurationData<?> conf) {
           return new TopicConsumerConfigurationData(topicNameOrPattern, conf.getPriorityLevel());
       }
   }
   ```
   
   Then, in `ConsumerImpl` the appropriate topic configuration can be selected
   based on the topic being subscribed to. Since the topic configuration is
   effectively keyed by a topic name or pattern, it’s possible for the user to be
   able configure multiple topic configurations that match the same concrete topic
   name. In this case the first topic name match should be selected.
   
   ```java
   TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName, 
           ConsumerConfigurationData conf) {
       return topicConfigurations.stream()
           .filter(topicConf -> topicConf.matchesTopicName(topicName))
           .findFirst()
           .orElseGet(() -> TopicConsumerConfigurationData.of(topicName, conf));
   }
   ```
   
   Example Usage:
   
   ```java
   pulsarClient.newConsumer()
       .topicsPattern("events.*")
       .priorityLevel(1)
       .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
       .subscribe();
   ```
   
   or
   
   ```java
   pulsarClient.newConsumer()
       .topicsPattern("events.*")
       .priorityLevel(1)
       .topicConfiguration(".*us-east-1")
           .priorityLevel(0)
           .build()
       .subscribe();
   ```
   
   ## Rejected Alternatives
   
   * Extend the existing `ConsumerBuilder` rather than introducing a nested, topic specific builder class.
   
   Rejection reason: Does not provide a clear API to discover and extend other topic specific configuration options and overrides.
   
   ```java
   public interface ConsumerBuilder<T> extends Cloneable {
       ...
   
       ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int priorityLevel);
   }
   ```
   
   Example usage:
   
   ```java
   pulsarClient.newConsumer()
       .topicsPattern("events.*")
       .priorityLevel(1)
       .topicPriorityLevel(".*us-east-1", 0)
       .subscribe();
   ```
   
   * Provide a configurator interface to configure options and overrides at runtime
   
   Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
   
   ```java
   @Data 
   class TopicConsumerConfigurationData {
       private int priorityLevel;
   }
   ```
   
   ```java
   interface TopicConsumerConfigurator extends Serializable {
      void configure(String topicName, TopicConsumerConfigurationData topicConf);
   }
   ```
   
   ```java
   public interface ConsumerBuilder<T> extends Cloneable {
       ...
   
       ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator configurator);
   }
   ```
   
   Example usage:
   
   ```java
   pulsarClient.newConsumer()
       .topicsPattern("events.*")
       .priorityLevel(1)
       .topicConfigurator((topicName, topicConf) -> {
           if (topicName.endsWith("us-east-1") {
               topicConf.setPriorityLevel(0);
           }
       })
       .subscribe();
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nahguam commented on issue #16481: PIP-184: Topic specific consumer priorityLevel

Posted by GitBox <gi...@apache.org>.
nahguam commented on issue #16481:
URL: https://github.com/apache/pulsar/issues/16481#issuecomment-1199072091

   I've updated the design to keep the concrete topic names separate from the topic patterns in the API to avoid unwanted regex behaviour like like hyphens and illegal character ranges etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- closed issue #16481: PIP-184: Topic specific consumer priorityLevel

Posted by GitBox <gi...@apache.org>.
Technoboy- closed issue #16481: PIP-184: Topic specific consumer priorityLevel
URL: https://github.com/apache/pulsar/issues/16481


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org