You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Laxmi Narayan <ni...@gmail.com> on 2017/08/22 07:54:45 UTC
exception in multi threaded env for manual topic assignment
Hi ,
I am getting below exception while reading topic partitions individual.
Exception: Subscription to topics, partitions and pattern are mutually
exclusivejava.lang.IllegalStateException
I am passing new KafkaConsumer for each thread and topic's partition number
and same consumer group for entire thread pool.
Sample code :
@Override
public void listenToPartition(String topicGroupId, String topicName,
int partitionNumber, long poolingTimeOut) {
System.out.println("subscribed topic : " + topicName);
* kafkaConsumer.subscribe(Arrays.asList(topicName));*
* TopicPartition topicPartition1 = new TopicPartition(topicName,
partitionNumber);*
System.out.println(" | partition number : " +
topicPartition1.partition() + " |");
try {
*
kafkaConsumer.assign(Collections.singletonList(topicPartition1));*
System.out.println("assigned..." + partitionNumber+" | ");
while (true) {
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> consumerRecord :
consumerRecords) {
System.out.println("assigned... consumer record..");
System.out.println(" consumer rec : " +
consumerRecord.partition() + " | "
+ consumerRecord.value() + " | " +
consumerRecord.key());
}
}
} catch (Exception ex) {
System.out.println("exception occured while" + ex.getMessage());
ex.printStackTrace();
}
}
Keep learning keep moving .....