You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Laxmi Narayan <ni...@gmail.com> on 2017/08/22 07:54:45 UTC

exception in multi threaded env for manual topic assignment

Hi ,
I am getting below exception while reading topic partitions individual.

Exception: Subscription to topics, partitions and pattern are mutually
exclusivejava.lang.IllegalStateException

I am passing new KafkaConsumer for each thread and topic's partition number
and same consumer group for entire thread pool.

Sample code :

@Override
    public void listenToPartition(String topicGroupId, String topicName,
int partitionNumber, long poolingTimeOut) {

        System.out.println("subscribed topic : " + topicName);

     *   kafkaConsumer.subscribe(Arrays.asList(topicName));*

*        TopicPartition topicPartition1 = new TopicPartition(topicName,
partitionNumber);*

        System.out.println(" | partition number : " +
topicPartition1.partition() + " |");

        try {
*
kafkaConsumer.assign(Collections.singletonList(topicPartition1));*

            System.out.println("assigned..." + partitionNumber+" | ");

            while (true) {

                ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(100);

                for (ConsumerRecord<String, String> consumerRecord :
consumerRecords) {

                    System.out.println("assigned... consumer record..");
                    System.out.println(" consumer rec : " +
consumerRecord.partition() + " | "
                            + consumerRecord.value() + " | " +
consumerRecord.key());
                }
            }
        } catch (Exception ex) {
            System.out.println("exception occured while" + ex.getMessage());
            ex.printStackTrace();
        }
    }



Keep learning keep moving .....