You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/05 06:48:20 UTC
[GitHub] [pulsar] liudezhi2098 opened a new pull request #7179: fix kafka_0_9 Consumer partition topic throw error
liudezhi2098 opened a new pull request #7179:
URL: https://github.com/apache/pulsar/pull/7179
Master Issue: #7178
## Motivation
When using pulsar-client-kafka_0_9 version to consume partition topic, will throw java.lang.ClassCastException,
because
```
org.apache.pulsar.client.impl.TopicMessageIdImpl cannot be cast to org.apache.pulsar.client.impl.MessageIdImpl
```
## Modifications
BrokerService
```java
public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
List<TopicPartition> topicPartitions = new ArrayList<>();
try {
for (String topic : topics) {
// Create individual subscription on each partition, that way we can keep using the
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
//consumerBuilder.topics(topics);
if (numberOfPartitions > 1) {
// Subscribe to each partition
consumerBuilder.consumerName(ConsumerName.generateRandomName());
for (int i = 0; i < numberOfPartitions; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(
TopicName.get(topic).getPartitionedTopicName(),
partitionIndex);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
return consumer;
}));
topicPartitions.add(tp);
```
We should remove consumerBuilder.topics(topics)。
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] sijie commented on pull request #7179: fix kafka_0_9 Consumer partition topic throw error
Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #7179:
URL: https://github.com/apache/pulsar/pull/7179#issuecomment-640312028
/pulsarbot run-failure-checks
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codelipenghui merged pull request #7179: fix kafka_0_9 Consumer partition topic throw error
Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7179:
URL: https://github.com/apache/pulsar/pull/7179
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org