You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/03/09 11:46:47 UTC
[GitHub] [pulsar] liketic commented on issue #3733: support Pattern
subscribe in kafka compat client
liketic commented on issue #3733: support Pattern subscribe in kafka compat client
URL: https://github.com/apache/pulsar/issues/3733#issuecomment-471170264
Thank you @jiazhai @sijie . The method ```subscribe``` https://github.com/apache/pulsar/blob/41b286dc0cc7b6ca3aa24acfccd53dfed547c0c5/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java#L215
lists all partitions for each topic and create a ```TopicPartition``` instance for each pair of them, and update the member variables ```consumers``` and ```unpolledPartitions``` for later use.
What I mean is if we need to do the same things, try to get all topics matched the pattern and then get all partitions for each topic... , which means we need a method to list all topics based on a regex in client.
Otherwise, what I can think of is something like:
```
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
consumerBuilder.topicsPattern(pattern);
try {
consumerBuilder.subscribe();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services