You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Liel Shraga (lshraga)" <ls...@cisco.com> on 2017/09/13 07:54:14 UTC

Round Robin for several consumers in KAFKA

Hi,

I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1 producer and 2 consumers.
I publish messages to the topic via the producer.
Basically, I would likw that the messages will be published in a round robin algorithm,
so for that purpose I defined the consumers with the same group.id and added config of partition.assignment.strategy to be org.apache.kafka.clients.consumer.RoundRobinAssignor,
but actually only 1 consumer receive all the messages.
My Producer Code:
public class DiscoveryKafkaProducer{
   Producer<String, String> producer;

   public DiscoveryKafkaProducer(Properties configs) {
       producer = new KafkaProducer<String, String>(configs);
   }
   public void send(String topic, List<String> records) {
         for(String record: records){
               producer.send(new ProducerRecord<String, String>(topic, record));
         }
       producer.flush();
   }

   public void close() throws Exception {
       producer.close();
   }
}

My Consumer Code:
public static void main(String[] args) {
            String server = "lshraga-ubuntu-sp-nac:9092";
            Properties consumerConfigs = new Properties();
            consumerConfigs.put("bootstrap.servers", server);
            consumerConfigs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerConfigs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerConfigs.put("group.id", "discovery");
            consumerConfigs.put("client.id", "discovery");
            consumerConfigs.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
            List<String> list = new ArrayList<String>();
            DiscoveryKafkaConsumer consumer1 = new DiscoveryKafkaConsumer(Collections.singletonList(topicName), consumerConfigs);

            try {
                  while (true) {
                        System.out.println("Start to consume");
                      consumer1.poll(1000L);
                  }
            } catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
            }

public class DiscoveryKafkaConsumer {
   Consumer<String, String> consumer;
   Integer id;
   public DiscoveryKafkaConsumer(List<String> topics, Properties configs) {
       consumer = new KafkaConsumer<String, String>(configs);
       consumer.subscribe(topics);
   }

   public DiscoveryKafkaConsumer(int i, List<String> topics, Properties configs) {
         consumer = new KafkaConsumer<String, String>(configs);
       consumer.subscribe(topics);
       this.id = i;
}

   public void poll(long timeout) throws InterruptedException {
       ConsumerRecords<String,String> records = consumer.poll(timeout);
       System.out.println("Hey!Consumer #" + id + "got records:" + records);
       Map<String, List<String>> results = new HashMap<String, List<String>>();
       records.forEach((cr) -> {
         System.out.println("cr.topic()=" + cr.topic());
           List<String> list = results.get(cr.topic());
           if(list == null) {
               list = new ArrayList<>();
               results.put(cr.topic(), list);
               }
                  list.add(cr.value());
                  System.out.println("list=" + list);
           });
   }
   public void close() throws Exception {
       consumer.close();
   }
What I need to add/condig in order to consume the messages in a Round Robin ?


Thanks,


[banner14]



Liel Shraga
ENGINEER.SOFTWARE ENGINEERING
lshraga@cisco.com<ma...@cisco.com>
Tel: +972 2 588 6394

Cisco Systems, Inc.
32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
SOUTH NETANYA
42504
Israel
cisco.com


[http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think before you print.

This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message.
Please click here<http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for Company Registration Information.



RE: Round Robin for several consumers in KAFKA

Posted by "Liel Shraga (lshraga)" <ls...@cisco.com>.
No. only the number of consmers


 


Liel Shraga
ENGINEER.SOFTWARE ENGINEERING
lshraga@cisco.com
Tel: +972 2 588 6394
Cisco Systems, Inc.
32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
SOUTH NETANYA
42504
Israel
cisco.com


Think before you print.
This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message.
Please click here for Company Registration Information.



-----Original Message-----
From: Amir Shahinpour [mailto:amir@holisticlabs.net] 
Sent: Tuesday, September 19, 2017 9:28 AM
To: users@kafka.apache.org
Subject: Re: Round Robin for several consumers in KAFKA

Does it mean that the number of brokers changes from time to time?

On Mon, Sep 18, 2017 at 11:10 PM, Liel Shraga (lshraga) <ls...@cisco.com>
wrote:

> Hi,
>
> My docker compose file is :
>
> version: '2'
> services:
>   zookeeper:
>     image: wurstmeister/zookeeper
>     ports:
>       - "2181:2181"
>   kafka:
>     image: wurstmeister/kafka
>     ports:
>       - "9092:9092"
>     environment:
>       KAFKA_ADVERTISED_HOST_NAME: lshraga-ubuntu-sp-nac
>       KAFKA_ADVERTISED_PORT: 9092
>       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
>       KAFKA_NUM_PARTITIONS: 10
>     volumes:
>       - /var/run/docker.sock:/var/run/docker.sock
>
> Basically, I increased the number of KAFKA_NUM_PARTITIONS to be 10, 
> but I need to do it dynamically, since I have several micro services 
> which are consumers and they grow and harvest in runtime dynanically.
> Is there a way to change the number of pratiotns dynamically via java code?
>
> Thanks,
>
>
>
>
> Liel Shraga
> ENGINEER.SOFTWARE ENGINEERING
> lshraga@cisco.com
> Tel: +972 2 588 6394
> Cisco Systems, Inc.
> 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir SOUTH 
> NETANYA
> 42504
> Israel
> cisco.com
>
>
> Think before you print.
> This email may contain confidential and privileged material for the 
> sole use of the intended recipient. Any review, use, distribution or 
> disclosure by others is strictly prohibited. If you are not the 
> intended recipient (or authorized to receive for the recipient), 
> please contact the sender by reply email and delete all copies of this message.
> Please click here for Company Registration Information.
>
>
>
> -----Original Message-----
> From: Amir Shahinpour [mailto:amir@holisticlabs.net]
> Sent: Tuesday, September 19, 2017 9:07 AM
> To: users@kafka.apache.org
> Subject: Re: Round Robin for several consumers in KAFKA
>
> Can you provide your docker file, or compose file?
>
> On Wed, Sep 13, 2017 at 1:55 AM, Liel Shraga (lshraga) 
> <ls...@cisco.com>
> wrote:
>
> > Hi,
> >
> > I didn’t define the partition size. How can I do it with 
> > kafka-clients
> API?
> >
> > Thanks,
> >
> >
> >
> >
> >
> > Liel Shraga
> > ENGINEER.SOFTWARE ENGINEERING
> > lshraga@cisco.com
> > Tel: +972 2 588 6394
> > Cisco Systems, Inc.
> > 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir SOUTH 
> > NETANYA
> > 42504
> > Israel
> > cisco.com
> >
> >
> > Think before you print.
> > This email may contain confidential and privileged material for the 
> > sole use of the intended recipient. Any review, use, distribution or 
> > disclosure by others is strictly prohibited. If you are not the 
> > intended recipient (or authorized to receive for the recipient), 
> > please contact the sender by reply email and delete all copies of 
> > this
> message.
> > Please click here for Company Registration Information.
> >
> >
> >
> > -----Original Message-----
> > From: Manikumar [mailto:manikumar.reddy@gmail.com]
> > Sent: Wednesday, September 13, 2017 11:47 AM
> > To: users@kafka.apache.org
> > Subject: Re: Round Robin for several consumers in KAFKA
> >
> > what is the partition size? you need at least 2 partitions to 
> > distribute across two consumers
> >
> >
> > On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) 
> > <ls...@cisco.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I have 5 separate docker images : 1 for kafka broker, 1 zookeeper 
> > > ,
> > > 1 producer and 2 consumers.
> > >
> > > I publish messages to the topic via the producer.
> > >
> > > Basically, I would likw that the messages will be published in a 
> > > round robin algorithm,
> > >
> > > so for that purpose I defined the consumers with the same group.id 
> > > and added config of partition.assignment.strategy to be 
> > > org.apache.kafka.clients.consumer.RoundRobinAssignor,
> > >
> > > but actually only 1 consumer receive all the messages.
> > >
> > > *My Producer Code: *
> > >
> > > *public* *class* DiscoveryKafkaProducer{
> > >
> > >    Producer<String, String> producer;
> > >
> > >
> > >
> > >    *public* DiscoveryKafkaProducer(Properties configs) {
> > >
> > >        producer = *new* KafkaProducer<String, String>(configs);
> > >
> > >    }
> > >
> > >    *public* *void* send(String topic, List<String> records) {
> > >
> > >          *for*(String record: records){
> > >
> > >                producer.send(*new* ProducerRecord<String,
> > > String>(topic, record));
> > >
> > >          }
> > >
> > >        producer.flush();
> > >
> > >    }
> > >
> > >
> > >
> > >    *public* *void* close() *throws* Exception {
> > >
> > >        producer.close();
> > >
> > >    }
> > >
> > > }
> > >
> > >
> > >
> > > *My Consumer Code:*
> > >
> > > *public* *static* *void* main(String[] args) {
> > >
> > >             String server = "lshraga-ubuntu-sp-nac:9092";
> > >
> > >             Properties consumerConfigs = *new* Properties();
> > >
> > >             consumerConfigs.put("bootstrap.servers", server);
> > >
> > >             consumerConfigs.put("key.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >
> > >             consumerConfigs.put("value.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >
> > >             consumerConfigs.put("group.id", "discovery");
> > >
> > >             consumerConfigs.put("client.id", "discovery");
> > >
> > >             consumerConfigs.put("partition.assignment.strategy",
> > > "org.apache.kafka.clients.consumer.RoundRobinAssignor");
> > >
> > >             List<String> *list* = *new* ArrayList<String>();
> > >
> > >             DiscoveryKafkaConsumer consumer1 = *new* 
> > > DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> > > consumerConfigs);
> > >
> > >
> > >
> > >             *try* {
> > >
> > >                   *while* (*true*) {
> > >
> > >                         System.*out*.println("Start to consume");
> > >
> > >                       consumer1.poll(1000L);
> > >
> > >                   }
> > >
> > >             } *catch* (InterruptedException e) {
> > >
> > >                   // *TODO* Auto-generated catch block
> > >
> > >                   e.printStackTrace();
> > >
> > >             }
> > >
> > >
> > >
> > > *public* *class* DiscoveryKafkaConsumer {
> > >
> > >    Consumer<String, String> consumer;
> > >
> > >    Integer id;
> > >
> > >    *public* DiscoveryKafkaConsumer(List<String> topics, Properties
> > > configs) {
> > >
> > >        consumer = *new* KafkaConsumer<String, String>(configs);
> > >
> > >        consumer.subscribe(topics);
> > >
> > >    }
> > >
> > >
> > >
> > >    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics, 
> > > Properties configs) {
> > >
> > >          consumer = *new* KafkaConsumer<String, String>(configs);
> > >
> > >        consumer.subscribe(topics);
> > >
> > >        *this*.id = i;
> > >
> > > }
> > >
> > >
> > >
> > >    *public* *void* poll(*long* timeout) *throws* 
> > > InterruptedException {
> > >
> > >        ConsumerRecords<String,String> records = 
> > > consumer.poll(timeout);
> > >
> > >        System.*out*.println("Hey!Consumer #" + id + "got records:" 
> > > + records);
> > >
> > >        Map<String, List<String>> results = *new* HashMap<String, 
> > > List<String>>();
> > >
> > >        records.forEach((cr) -> {
> > >
> > >          System.*out*.println("cr.topic()=" + cr.topic());
> > >
> > >            List<String> list = results.get(cr.topic());
> > >
> > >            *if*(list == *null*) {
> > >
> > >                list = *new* ArrayList<>();
> > >
> > >                results.put(cr.topic(), list);
> > >
> > >                }
> > >
> > >                   list.add(cr.value());
> > >
> > >                   System.*out*.println("list=" + list);
> > >
> > >            });
> > >
> > >    }
> > >
> > >    *public* *void* close() *throws* Exception {
> > >
> > >        consumer.close();
> > >
> > >    }
> > >
> > > *What I need to add/condig in order to consume the messages in a 
> > > Round Robin ?*
> > >
> > >
> > >
> > >
> > >
> > > Thanks,
> > >
> > >
> > >
> > >
> > >
> > > [image: banner14]
> > >
> > >
> > >
> > > *Liel Shraga*
> > >
> > > ENGINEER.SOFTWARE ENGINEERING
> > >
> > > lshraga@cisco.com
> > >
> > > Tel: *+972 2 588 6394*
> > >
> > > *Cisco Systems, Inc.*
> > >
> > > 32 HaMelacha St., (HaSharon
> > > <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmai
> > > l&
> > > so
> > > urce=g>
> > > Bldg) P.O.Box 8735, I.Z.Sapir
> > > SOUTH NETANYA
> > > 42504
> > > Israel
> > > cisco.com
> > >
> > >
> > >
> > > [image:
> > > http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> > > before you print.
> > >
> > > This email may contain confidential and privileged material for 
> > > the sole use of the intended recipient. Any review, use, 
> > > distribution or disclosure by others is strictly prohibited. If 
> > > you are not the intended recipient (or authorized to receive for 
> > > the recipient), please contact the sender by reply email and 
> > > delete all copies of this
> > message.
> > >
> > > Please click here
> > > <http://www.cisco.com/web/about/doing_business/legal/cri/index.htm
> > > l> for Company Registration Information.
> > >
> > >
> > >
> >
>

Re: Round Robin for several consumers in KAFKA

Posted by Amir Shahinpour <am...@holisticlabs.net>.
Does it mean that the number of brokers changes from time to time?

On Mon, Sep 18, 2017 at 11:10 PM, Liel Shraga (lshraga) <ls...@cisco.com>
wrote:

> Hi,
>
> My docker compose file is :
>
> version: '2'
> services:
>   zookeeper:
>     image: wurstmeister/zookeeper
>     ports:
>       - "2181:2181"
>   kafka:
>     image: wurstmeister/kafka
>     ports:
>       - "9092:9092"
>     environment:
>       KAFKA_ADVERTISED_HOST_NAME: lshraga-ubuntu-sp-nac
>       KAFKA_ADVERTISED_PORT: 9092
>       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
>       KAFKA_NUM_PARTITIONS: 10
>     volumes:
>       - /var/run/docker.sock:/var/run/docker.sock
>
> Basically, I increased the number of KAFKA_NUM_PARTITIONS to be 10, but I
> need to do it dynamically, since I have several micro services which are
> consumers and they grow and harvest in runtime dynanically.
> Is there a way to change the number of pratiotns dynamically via java code?
>
> Thanks,
>
>
>
>
> Liel Shraga
> ENGINEER.SOFTWARE ENGINEERING
> lshraga@cisco.com
> Tel: +972 2 588 6394
> Cisco Systems, Inc.
> 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
> SOUTH NETANYA
> 42504
> Israel
> cisco.com
>
>
> Think before you print.
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
> Please click here for Company Registration Information.
>
>
>
> -----Original Message-----
> From: Amir Shahinpour [mailto:amir@holisticlabs.net]
> Sent: Tuesday, September 19, 2017 9:07 AM
> To: users@kafka.apache.org
> Subject: Re: Round Robin for several consumers in KAFKA
>
> Can you provide your docker file, or compose file?
>
> On Wed, Sep 13, 2017 at 1:55 AM, Liel Shraga (lshraga) <ls...@cisco.com>
> wrote:
>
> > Hi,
> >
> > I didn’t define the partition size. How can I do it with kafka-clients
> API?
> >
> > Thanks,
> >
> >
> >
> >
> >
> > Liel Shraga
> > ENGINEER.SOFTWARE ENGINEERING
> > lshraga@cisco.com
> > Tel: +972 2 588 6394
> > Cisco Systems, Inc.
> > 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir SOUTH
> > NETANYA
> > 42504
> > Israel
> > cisco.com
> >
> >
> > Think before you print.
> > This email may contain confidential and privileged material for the
> > sole use of the intended recipient. Any review, use, distribution or
> > disclosure by others is strictly prohibited. If you are not the
> > intended recipient (or authorized to receive for the recipient),
> > please contact the sender by reply email and delete all copies of this
> message.
> > Please click here for Company Registration Information.
> >
> >
> >
> > -----Original Message-----
> > From: Manikumar [mailto:manikumar.reddy@gmail.com]
> > Sent: Wednesday, September 13, 2017 11:47 AM
> > To: users@kafka.apache.org
> > Subject: Re: Round Robin for several consumers in KAFKA
> >
> > what is the partition size? you need at least 2 partitions to
> > distribute across two consumers
> >
> >
> > On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga)
> > <ls...@cisco.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I have 5 separate docker images : 1 for kafka broker, 1 zookeeper ,
> > > 1 producer and 2 consumers.
> > >
> > > I publish messages to the topic via the producer.
> > >
> > > Basically, I would likw that the messages will be published in a
> > > round robin algorithm,
> > >
> > > so for that purpose I defined the consumers with the same group.id
> > > and added config of partition.assignment.strategy to be
> > > org.apache.kafka.clients.consumer.RoundRobinAssignor,
> > >
> > > but actually only 1 consumer receive all the messages.
> > >
> > > *My Producer Code: *
> > >
> > > *public* *class* DiscoveryKafkaProducer{
> > >
> > >    Producer<String, String> producer;
> > >
> > >
> > >
> > >    *public* DiscoveryKafkaProducer(Properties configs) {
> > >
> > >        producer = *new* KafkaProducer<String, String>(configs);
> > >
> > >    }
> > >
> > >    *public* *void* send(String topic, List<String> records) {
> > >
> > >          *for*(String record: records){
> > >
> > >                producer.send(*new* ProducerRecord<String,
> > > String>(topic, record));
> > >
> > >          }
> > >
> > >        producer.flush();
> > >
> > >    }
> > >
> > >
> > >
> > >    *public* *void* close() *throws* Exception {
> > >
> > >        producer.close();
> > >
> > >    }
> > >
> > > }
> > >
> > >
> > >
> > > *My Consumer Code:*
> > >
> > > *public* *static* *void* main(String[] args) {
> > >
> > >             String server = "lshraga-ubuntu-sp-nac:9092";
> > >
> > >             Properties consumerConfigs = *new* Properties();
> > >
> > >             consumerConfigs.put("bootstrap.servers", server);
> > >
> > >             consumerConfigs.put("key.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >
> > >             consumerConfigs.put("value.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >
> > >             consumerConfigs.put("group.id", "discovery");
> > >
> > >             consumerConfigs.put("client.id", "discovery");
> > >
> > >             consumerConfigs.put("partition.assignment.strategy",
> > > "org.apache.kafka.clients.consumer.RoundRobinAssignor");
> > >
> > >             List<String> *list* = *new* ArrayList<String>();
> > >
> > >             DiscoveryKafkaConsumer consumer1 = *new*
> > > DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> > > consumerConfigs);
> > >
> > >
> > >
> > >             *try* {
> > >
> > >                   *while* (*true*) {
> > >
> > >                         System.*out*.println("Start to consume");
> > >
> > >                       consumer1.poll(1000L);
> > >
> > >                   }
> > >
> > >             } *catch* (InterruptedException e) {
> > >
> > >                   // *TODO* Auto-generated catch block
> > >
> > >                   e.printStackTrace();
> > >
> > >             }
> > >
> > >
> > >
> > > *public* *class* DiscoveryKafkaConsumer {
> > >
> > >    Consumer<String, String> consumer;
> > >
> > >    Integer id;
> > >
> > >    *public* DiscoveryKafkaConsumer(List<String> topics, Properties
> > > configs) {
> > >
> > >        consumer = *new* KafkaConsumer<String, String>(configs);
> > >
> > >        consumer.subscribe(topics);
> > >
> > >    }
> > >
> > >
> > >
> > >    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics,
> > > Properties configs) {
> > >
> > >          consumer = *new* KafkaConsumer<String, String>(configs);
> > >
> > >        consumer.subscribe(topics);
> > >
> > >        *this*.id = i;
> > >
> > > }
> > >
> > >
> > >
> > >    *public* *void* poll(*long* timeout) *throws*
> > > InterruptedException {
> > >
> > >        ConsumerRecords<String,String> records =
> > > consumer.poll(timeout);
> > >
> > >        System.*out*.println("Hey!Consumer #" + id + "got records:" +
> > > records);
> > >
> > >        Map<String, List<String>> results = *new* HashMap<String,
> > > List<String>>();
> > >
> > >        records.forEach((cr) -> {
> > >
> > >          System.*out*.println("cr.topic()=" + cr.topic());
> > >
> > >            List<String> list = results.get(cr.topic());
> > >
> > >            *if*(list == *null*) {
> > >
> > >                list = *new* ArrayList<>();
> > >
> > >                results.put(cr.topic(), list);
> > >
> > >                }
> > >
> > >                   list.add(cr.value());
> > >
> > >                   System.*out*.println("list=" + list);
> > >
> > >            });
> > >
> > >    }
> > >
> > >    *public* *void* close() *throws* Exception {
> > >
> > >        consumer.close();
> > >
> > >    }
> > >
> > > *What I need to add/condig in order to consume the messages in a
> > > Round Robin ?*
> > >
> > >
> > >
> > >
> > >
> > > Thanks,
> > >
> > >
> > >
> > >
> > >
> > > [image: banner14]
> > >
> > >
> > >
> > > *Liel Shraga*
> > >
> > > ENGINEER.SOFTWARE ENGINEERING
> > >
> > > lshraga@cisco.com
> > >
> > > Tel: *+972 2 588 6394*
> > >
> > > *Cisco Systems, Inc.*
> > >
> > > 32 HaMelacha St., (HaSharon
> > > <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&
> > > so
> > > urce=g>
> > > Bldg) P.O.Box 8735, I.Z.Sapir
> > > SOUTH NETANYA
> > > 42504
> > > Israel
> > > cisco.com
> > >
> > >
> > >
> > > [image:
> > > http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> > > before you print.
> > >
> > > This email may contain confidential and privileged material for the
> > > sole use of the intended recipient. Any review, use, distribution or
> > > disclosure by others is strictly prohibited. If you are not the
> > > intended recipient (or authorized to receive for the recipient),
> > > please contact the sender by reply email and delete all copies of
> > > this
> > message.
> > >
> > > Please click here
> > > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html>
> > > for Company Registration Information.
> > >
> > >
> > >
> >
>

RE: Round Robin for several consumers in KAFKA

Posted by "Liel Shraga (lshraga)" <ls...@cisco.com>.
Hi,

My docker compose file is : 

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: lshraga-ubuntu-sp-nac
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_NUM_PARTITIONS: 10
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Basically, I increased the number of KAFKA_NUM_PARTITIONS to be 10, but I need to do it dynamically, since I have several micro services which are consumers and they grow and harvest in runtime dynanically.
Is there a way to change the number of pratiotns dynamically via java code?

Thanks,

 


Liel Shraga
ENGINEER.SOFTWARE ENGINEERING
lshraga@cisco.com
Tel: +972 2 588 6394
Cisco Systems, Inc.
32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
SOUTH NETANYA
42504
Israel
cisco.com


Think before you print.
This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message.
Please click here for Company Registration Information.



-----Original Message-----
From: Amir Shahinpour [mailto:amir@holisticlabs.net] 
Sent: Tuesday, September 19, 2017 9:07 AM
To: users@kafka.apache.org
Subject: Re: Round Robin for several consumers in KAFKA

Can you provide your docker file, or compose file?

On Wed, Sep 13, 2017 at 1:55 AM, Liel Shraga (lshraga) <ls...@cisco.com>
wrote:

> Hi,
>
> I didn’t define the partition size. How can I do it with kafka-clients API?
>
> Thanks,
>
>
>
>
>
> Liel Shraga
> ENGINEER.SOFTWARE ENGINEERING
> lshraga@cisco.com
> Tel: +972 2 588 6394
> Cisco Systems, Inc.
> 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir SOUTH 
> NETANYA
> 42504
> Israel
> cisco.com
>
>
> Think before you print.
> This email may contain confidential and privileged material for the 
> sole use of the intended recipient. Any review, use, distribution or 
> disclosure by others is strictly prohibited. If you are not the 
> intended recipient (or authorized to receive for the recipient), 
> please contact the sender by reply email and delete all copies of this message.
> Please click here for Company Registration Information.
>
>
>
> -----Original Message-----
> From: Manikumar [mailto:manikumar.reddy@gmail.com]
> Sent: Wednesday, September 13, 2017 11:47 AM
> To: users@kafka.apache.org
> Subject: Re: Round Robin for several consumers in KAFKA
>
> what is the partition size? you need at least 2 partitions to 
> distribute across two consumers
>
>
> On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) 
> <ls...@cisco.com>
> wrote:
>
> > Hi,
> >
> >
> >
> > I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 
> > 1 producer and 2 consumers.
> >
> > I publish messages to the topic via the producer.
> >
> > Basically, I would likw that the messages will be published in a 
> > round robin algorithm,
> >
> > so for that purpose I defined the consumers with the same group.id 
> > and added config of partition.assignment.strategy to be 
> > org.apache.kafka.clients.consumer.RoundRobinAssignor,
> >
> > but actually only 1 consumer receive all the messages.
> >
> > *My Producer Code: *
> >
> > *public* *class* DiscoveryKafkaProducer{
> >
> >    Producer<String, String> producer;
> >
> >
> >
> >    *public* DiscoveryKafkaProducer(Properties configs) {
> >
> >        producer = *new* KafkaProducer<String, String>(configs);
> >
> >    }
> >
> >    *public* *void* send(String topic, List<String> records) {
> >
> >          *for*(String record: records){
> >
> >                producer.send(*new* ProducerRecord<String,
> > String>(topic, record));
> >
> >          }
> >
> >        producer.flush();
> >
> >    }
> >
> >
> >
> >    *public* *void* close() *throws* Exception {
> >
> >        producer.close();
> >
> >    }
> >
> > }
> >
> >
> >
> > *My Consumer Code:*
> >
> > *public* *static* *void* main(String[] args) {
> >
> >             String server = "lshraga-ubuntu-sp-nac:9092";
> >
> >             Properties consumerConfigs = *new* Properties();
> >
> >             consumerConfigs.put("bootstrap.servers", server);
> >
> >             consumerConfigs.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> >             consumerConfigs.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> >             consumerConfigs.put("group.id", "discovery");
> >
> >             consumerConfigs.put("client.id", "discovery");
> >
> >             consumerConfigs.put("partition.assignment.strategy",
> > "org.apache.kafka.clients.consumer.RoundRobinAssignor");
> >
> >             List<String> *list* = *new* ArrayList<String>();
> >
> >             DiscoveryKafkaConsumer consumer1 = *new* 
> > DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> > consumerConfigs);
> >
> >
> >
> >             *try* {
> >
> >                   *while* (*true*) {
> >
> >                         System.*out*.println("Start to consume");
> >
> >                       consumer1.poll(1000L);
> >
> >                   }
> >
> >             } *catch* (InterruptedException e) {
> >
> >                   // *TODO* Auto-generated catch block
> >
> >                   e.printStackTrace();
> >
> >             }
> >
> >
> >
> > *public* *class* DiscoveryKafkaConsumer {
> >
> >    Consumer<String, String> consumer;
> >
> >    Integer id;
> >
> >    *public* DiscoveryKafkaConsumer(List<String> topics, Properties
> > configs) {
> >
> >        consumer = *new* KafkaConsumer<String, String>(configs);
> >
> >        consumer.subscribe(topics);
> >
> >    }
> >
> >
> >
> >    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics, 
> > Properties configs) {
> >
> >          consumer = *new* KafkaConsumer<String, String>(configs);
> >
> >        consumer.subscribe(topics);
> >
> >        *this*.id = i;
> >
> > }
> >
> >
> >
> >    *public* *void* poll(*long* timeout) *throws* 
> > InterruptedException {
> >
> >        ConsumerRecords<String,String> records = 
> > consumer.poll(timeout);
> >
> >        System.*out*.println("Hey!Consumer #" + id + "got records:" + 
> > records);
> >
> >        Map<String, List<String>> results = *new* HashMap<String, 
> > List<String>>();
> >
> >        records.forEach((cr) -> {
> >
> >          System.*out*.println("cr.topic()=" + cr.topic());
> >
> >            List<String> list = results.get(cr.topic());
> >
> >            *if*(list == *null*) {
> >
> >                list = *new* ArrayList<>();
> >
> >                results.put(cr.topic(), list);
> >
> >                }
> >
> >                   list.add(cr.value());
> >
> >                   System.*out*.println("list=" + list);
> >
> >            });
> >
> >    }
> >
> >    *public* *void* close() *throws* Exception {
> >
> >        consumer.close();
> >
> >    }
> >
> > *What I need to add/condig in order to consume the messages in a 
> > Round Robin ?*
> >
> >
> >
> >
> >
> > Thanks,
> >
> >
> >
> >
> >
> > [image: banner14]
> >
> >
> >
> > *Liel Shraga*
> >
> > ENGINEER.SOFTWARE ENGINEERING
> >
> > lshraga@cisco.com
> >
> > Tel: *+972 2 588 6394*
> >
> > *Cisco Systems, Inc.*
> >
> > 32 HaMelacha St., (HaSharon
> > <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&
> > so
> > urce=g>
> > Bldg) P.O.Box 8735, I.Z.Sapir
> > SOUTH NETANYA
> > 42504
> > Israel
> > cisco.com
> >
> >
> >
> > [image:
> > http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> > before you print.
> >
> > This email may contain confidential and privileged material for the 
> > sole use of the intended recipient. Any review, use, distribution or 
> > disclosure by others is strictly prohibited. If you are not the 
> > intended recipient (or authorized to receive for the recipient), 
> > please contact the sender by reply email and delete all copies of 
> > this
> message.
> >
> > Please click here
> > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html>
> > for Company Registration Information.
> >
> >
> >
>

Re: Round Robin for several consumers in KAFKA

Posted by Amir Shahinpour <am...@holisticlabs.net>.
Can you provide your docker file, or compose file?

On Wed, Sep 13, 2017 at 1:55 AM, Liel Shraga (lshraga) <ls...@cisco.com>
wrote:

> Hi,
>
> I didn’t define the partition size. How can I do it with kafka-clients API?
>
> Thanks,
>
>
>
>
>
> Liel Shraga
> ENGINEER.SOFTWARE ENGINEERING
> lshraga@cisco.com
> Tel: +972 2 588 6394
> Cisco Systems, Inc.
> 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
> SOUTH NETANYA
> 42504
> Israel
> cisco.com
>
>
> Think before you print.
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
> Please click here for Company Registration Information.
>
>
>
> -----Original Message-----
> From: Manikumar [mailto:manikumar.reddy@gmail.com]
> Sent: Wednesday, September 13, 2017 11:47 AM
> To: users@kafka.apache.org
> Subject: Re: Round Robin for several consumers in KAFKA
>
> what is the partition size? you need at least 2 partitions to distribute
> across two consumers
>
>
> On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) <ls...@cisco.com>
> wrote:
>
> > Hi,
> >
> >
> >
> > I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1
> > producer and 2 consumers.
> >
> > I publish messages to the topic via the producer.
> >
> > Basically, I would likw that the messages will be published in a round
> > robin algorithm,
> >
> > so for that purpose I defined the consumers with the same group.id and
> > added config of partition.assignment.strategy to be
> > org.apache.kafka.clients.consumer.RoundRobinAssignor,
> >
> > but actually only 1 consumer receive all the messages.
> >
> > *My Producer Code: *
> >
> > *public* *class* DiscoveryKafkaProducer{
> >
> >    Producer<String, String> producer;
> >
> >
> >
> >    *public* DiscoveryKafkaProducer(Properties configs) {
> >
> >        producer = *new* KafkaProducer<String, String>(configs);
> >
> >    }
> >
> >    *public* *void* send(String topic, List<String> records) {
> >
> >          *for*(String record: records){
> >
> >                producer.send(*new* ProducerRecord<String,
> > String>(topic, record));
> >
> >          }
> >
> >        producer.flush();
> >
> >    }
> >
> >
> >
> >    *public* *void* close() *throws* Exception {
> >
> >        producer.close();
> >
> >    }
> >
> > }
> >
> >
> >
> > *My Consumer Code:*
> >
> > *public* *static* *void* main(String[] args) {
> >
> >             String server = "lshraga-ubuntu-sp-nac:9092";
> >
> >             Properties consumerConfigs = *new* Properties();
> >
> >             consumerConfigs.put("bootstrap.servers", server);
> >
> >             consumerConfigs.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> >             consumerConfigs.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> >             consumerConfigs.put("group.id", "discovery");
> >
> >             consumerConfigs.put("client.id", "discovery");
> >
> >             consumerConfigs.put("partition.assignment.strategy",
> > "org.apache.kafka.clients.consumer.RoundRobinAssignor");
> >
> >             List<String> *list* = *new* ArrayList<String>();
> >
> >             DiscoveryKafkaConsumer consumer1 = *new*
> > DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> > consumerConfigs);
> >
> >
> >
> >             *try* {
> >
> >                   *while* (*true*) {
> >
> >                         System.*out*.println("Start to consume");
> >
> >                       consumer1.poll(1000L);
> >
> >                   }
> >
> >             } *catch* (InterruptedException e) {
> >
> >                   // *TODO* Auto-generated catch block
> >
> >                   e.printStackTrace();
> >
> >             }
> >
> >
> >
> > *public* *class* DiscoveryKafkaConsumer {
> >
> >    Consumer<String, String> consumer;
> >
> >    Integer id;
> >
> >    *public* DiscoveryKafkaConsumer(List<String> topics, Properties
> > configs) {
> >
> >        consumer = *new* KafkaConsumer<String, String>(configs);
> >
> >        consumer.subscribe(topics);
> >
> >    }
> >
> >
> >
> >    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics,
> > Properties configs) {
> >
> >          consumer = *new* KafkaConsumer<String, String>(configs);
> >
> >        consumer.subscribe(topics);
> >
> >        *this*.id = i;
> >
> > }
> >
> >
> >
> >    *public* *void* poll(*long* timeout) *throws* InterruptedException
> > {
> >
> >        ConsumerRecords<String,String> records =
> > consumer.poll(timeout);
> >
> >        System.*out*.println("Hey!Consumer #" + id + "got records:" +
> > records);
> >
> >        Map<String, List<String>> results = *new* HashMap<String,
> > List<String>>();
> >
> >        records.forEach((cr) -> {
> >
> >          System.*out*.println("cr.topic()=" + cr.topic());
> >
> >            List<String> list = results.get(cr.topic());
> >
> >            *if*(list == *null*) {
> >
> >                list = *new* ArrayList<>();
> >
> >                results.put(cr.topic(), list);
> >
> >                }
> >
> >                   list.add(cr.value());
> >
> >                   System.*out*.println("list=" + list);
> >
> >            });
> >
> >    }
> >
> >    *public* *void* close() *throws* Exception {
> >
> >        consumer.close();
> >
> >    }
> >
> > *What I need to add/condig in order to consume the messages in a Round
> > Robin ?*
> >
> >
> >
> >
> >
> > Thanks,
> >
> >
> >
> >
> >
> > [image: banner14]
> >
> >
> >
> > *Liel Shraga*
> >
> > ENGINEER.SOFTWARE ENGINEERING
> >
> > lshraga@cisco.com
> >
> > Tel: *+972 2 588 6394*
> >
> > *Cisco Systems, Inc.*
> >
> > 32 HaMelacha St., (HaSharon
> > <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&so
> > urce=g>
> > Bldg) P.O.Box 8735, I.Z.Sapir
> > SOUTH NETANYA
> > 42504
> > Israel
> > cisco.com
> >
> >
> >
> > [image:
> > http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> > before you print.
> >
> > This email may contain confidential and privileged material for the
> > sole use of the intended recipient. Any review, use, distribution or
> > disclosure by others is strictly prohibited. If you are not the
> > intended recipient (or authorized to receive for the recipient),
> > please contact the sender by reply email and delete all copies of this
> message.
> >
> > Please click here
> > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html>
> > for Company Registration Information.
> >
> >
> >
>

RE: Round Robin for several consumers in KAFKA

Posted by "Liel Shraga (lshraga)" <ls...@cisco.com>.
Hi,

I didn’t define the partition size. How can I do it with kafka-clients API?

Thanks,


 


Liel Shraga
ENGINEER.SOFTWARE ENGINEERING
lshraga@cisco.com
Tel: +972 2 588 6394
Cisco Systems, Inc.
32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
SOUTH NETANYA
42504
Israel
cisco.com


Think before you print.
This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message.
Please click here for Company Registration Information.



-----Original Message-----
From: Manikumar [mailto:manikumar.reddy@gmail.com] 
Sent: Wednesday, September 13, 2017 11:47 AM
To: users@kafka.apache.org
Subject: Re: Round Robin for several consumers in KAFKA

what is the partition size? you need at least 2 partitions to distribute across two consumers


On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) <ls...@cisco.com>
wrote:

> Hi,
>
>
>
> I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1 
> producer and 2 consumers.
>
> I publish messages to the topic via the producer.
>
> Basically, I would likw that the messages will be published in a round 
> robin algorithm,
>
> so for that purpose I defined the consumers with the same group.id and 
> added config of partition.assignment.strategy to be 
> org.apache.kafka.clients.consumer.RoundRobinAssignor,
>
> but actually only 1 consumer receive all the messages.
>
> *My Producer Code: *
>
> *public* *class* DiscoveryKafkaProducer{
>
>    Producer<String, String> producer;
>
>
>
>    *public* DiscoveryKafkaProducer(Properties configs) {
>
>        producer = *new* KafkaProducer<String, String>(configs);
>
>    }
>
>    *public* *void* send(String topic, List<String> records) {
>
>          *for*(String record: records){
>
>                producer.send(*new* ProducerRecord<String, 
> String>(topic, record));
>
>          }
>
>        producer.flush();
>
>    }
>
>
>
>    *public* *void* close() *throws* Exception {
>
>        producer.close();
>
>    }
>
> }
>
>
>
> *My Consumer Code:*
>
> *public* *static* *void* main(String[] args) {
>
>             String server = "lshraga-ubuntu-sp-nac:9092";
>
>             Properties consumerConfigs = *new* Properties();
>
>             consumerConfigs.put("bootstrap.servers", server);
>
>             consumerConfigs.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>             consumerConfigs.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>             consumerConfigs.put("group.id", "discovery");
>
>             consumerConfigs.put("client.id", "discovery");
>
>             consumerConfigs.put("partition.assignment.strategy",
> "org.apache.kafka.clients.consumer.RoundRobinAssignor");
>
>             List<String> *list* = *new* ArrayList<String>();
>
>             DiscoveryKafkaConsumer consumer1 = *new* 
> DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> consumerConfigs);
>
>
>
>             *try* {
>
>                   *while* (*true*) {
>
>                         System.*out*.println("Start to consume");
>
>                       consumer1.poll(1000L);
>
>                   }
>
>             } *catch* (InterruptedException e) {
>
>                   // *TODO* Auto-generated catch block
>
>                   e.printStackTrace();
>
>             }
>
>
>
> *public* *class* DiscoveryKafkaConsumer {
>
>    Consumer<String, String> consumer;
>
>    Integer id;
>
>    *public* DiscoveryKafkaConsumer(List<String> topics, Properties 
> configs) {
>
>        consumer = *new* KafkaConsumer<String, String>(configs);
>
>        consumer.subscribe(topics);
>
>    }
>
>
>
>    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics, 
> Properties configs) {
>
>          consumer = *new* KafkaConsumer<String, String>(configs);
>
>        consumer.subscribe(topics);
>
>        *this*.id = i;
>
> }
>
>
>
>    *public* *void* poll(*long* timeout) *throws* InterruptedException 
> {
>
>        ConsumerRecords<String,String> records = 
> consumer.poll(timeout);
>
>        System.*out*.println("Hey!Consumer #" + id + "got records:" + 
> records);
>
>        Map<String, List<String>> results = *new* HashMap<String, 
> List<String>>();
>
>        records.forEach((cr) -> {
>
>          System.*out*.println("cr.topic()=" + cr.topic());
>
>            List<String> list = results.get(cr.topic());
>
>            *if*(list == *null*) {
>
>                list = *new* ArrayList<>();
>
>                results.put(cr.topic(), list);
>
>                }
>
>                   list.add(cr.value());
>
>                   System.*out*.println("list=" + list);
>
>            });
>
>    }
>
>    *public* *void* close() *throws* Exception {
>
>        consumer.close();
>
>    }
>
> *What I need to add/condig in order to consume the messages in a Round 
> Robin ?*
>
>
>
>
>
> Thanks,
>
>
>
>
>
> [image: banner14]
>
>
>
> *Liel Shraga*
>
> ENGINEER.SOFTWARE ENGINEERING
>
> lshraga@cisco.com
>
> Tel: *+972 2 588 6394*
>
> *Cisco Systems, Inc.*
>
> 32 HaMelacha St., (HaSharon
> <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&so
> urce=g>
> Bldg) P.O.Box 8735, I.Z.Sapir
> SOUTH NETANYA
> 42504
> Israel
> cisco.com
>
>
>
> [image: 
> http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> before you print.
>
> This email may contain confidential and privileged material for the 
> sole use of the intended recipient. Any review, use, distribution or 
> disclosure by others is strictly prohibited. If you are not the 
> intended recipient (or authorized to receive for the recipient), 
> please contact the sender by reply email and delete all copies of this message.
>
> Please click here
> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> 
> for Company Registration Information.
>
>
>

Re: Round Robin for several consumers in KAFKA

Posted by Manikumar <ma...@gmail.com>.
what is the partition size? you need at least 2 partitions to distribute
across two consumers


On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) <ls...@cisco.com>
wrote:

> Hi,
>
>
>
> I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1
> producer and 2 consumers.
>
> I publish messages to the topic via the producer.
>
> Basically, I would likw that the messages will be published in a round
> robin algorithm,
>
> so for that purpose I defined the consumers with the same group.id and
> added config of partition.assignment.strategy to be
> org.apache.kafka.clients.consumer.RoundRobinAssignor,
>
> but actually only 1 consumer receive all the messages.
>
> *My Producer Code: *
>
> *public* *class* DiscoveryKafkaProducer{
>
>    Producer<String, String> producer;
>
>
>
>    *public* DiscoveryKafkaProducer(Properties configs) {
>
>        producer = *new* KafkaProducer<String, String>(configs);
>
>    }
>
>    *public* *void* send(String topic, List<String> records) {
>
>          *for*(String record: records){
>
>                producer.send(*new* ProducerRecord<String, String>(topic,
> record));
>
>          }
>
>        producer.flush();
>
>    }
>
>
>
>    *public* *void* close() *throws* Exception {
>
>        producer.close();
>
>    }
>
> }
>
>
>
> *My Consumer Code:*
>
> *public* *static* *void* main(String[] args) {
>
>             String server = "lshraga-ubuntu-sp-nac:9092";
>
>             Properties consumerConfigs = *new* Properties();
>
>             consumerConfigs.put("bootstrap.servers", server);
>
>             consumerConfigs.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>             consumerConfigs.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>             consumerConfigs.put("group.id", "discovery");
>
>             consumerConfigs.put("client.id", "discovery");
>
>             consumerConfigs.put("partition.assignment.strategy",
> "org.apache.kafka.clients.consumer.RoundRobinAssignor");
>
>             List<String> *list* = *new* ArrayList<String>();
>
>             DiscoveryKafkaConsumer consumer1 = *new*
> DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> consumerConfigs);
>
>
>
>             *try* {
>
>                   *while* (*true*) {
>
>                         System.*out*.println("Start to consume");
>
>                       consumer1.poll(1000L);
>
>                   }
>
>             } *catch* (InterruptedException e) {
>
>                   // *TODO* Auto-generated catch block
>
>                   e.printStackTrace();
>
>             }
>
>
>
> *public* *class* DiscoveryKafkaConsumer {
>
>    Consumer<String, String> consumer;
>
>    Integer id;
>
>    *public* DiscoveryKafkaConsumer(List<String> topics, Properties configs)
> {
>
>        consumer = *new* KafkaConsumer<String, String>(configs);
>
>        consumer.subscribe(topics);
>
>    }
>
>
>
>    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics,
> Properties configs) {
>
>          consumer = *new* KafkaConsumer<String, String>(configs);
>
>        consumer.subscribe(topics);
>
>        *this*.id = i;
>
> }
>
>
>
>    *public* *void* poll(*long* timeout) *throws* InterruptedException {
>
>        ConsumerRecords<String,String> records = consumer.poll(timeout);
>
>        System.*out*.println("Hey!Consumer #" + id + "got records:" +
> records);
>
>        Map<String, List<String>> results = *new* HashMap<String,
> List<String>>();
>
>        records.forEach((cr) -> {
>
>          System.*out*.println("cr.topic()=" + cr.topic());
>
>            List<String> list = results.get(cr.topic());
>
>            *if*(list == *null*) {
>
>                list = *new* ArrayList<>();
>
>                results.put(cr.topic(), list);
>
>                }
>
>                   list.add(cr.value());
>
>                   System.*out*.println("list=" + list);
>
>            });
>
>    }
>
>    *public* *void* close() *throws* Exception {
>
>        consumer.close();
>
>    }
>
> *What I need to add/condig in order to consume the messages in a Round
> Robin ?*
>
>
>
>
>
> Thanks,
>
>
>
>
>
> [image: banner14]
>
>
>
> *Liel Shraga*
>
> ENGINEER.SOFTWARE ENGINEERING
>
> lshraga@cisco.com
>
> Tel: *+972 2 588 6394*
>
> *Cisco Systems, Inc.*
>
> 32 HaMelacha St., (HaSharon
> <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&source=g>
> Bldg) P.O.Box 8735, I.Z.Sapir
> SOUTH NETANYA
> 42504
> Israel
> cisco.com
>
>
>
> [image: http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> before you print.
>
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
>
> Please click here
> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for
> Company Registration Information.
>
>
>