You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Rico Lugod <rn...@gmail.com> on 2016/12/16 08:28:11 UTC

Multiple Consumer Group In Single Topic

Hi Guys,

Good day!

I have question regarding how to consume a specific message belongs to that
consumer group?

Here's the scenario:

Publish message "A" to topic "X"
Consume by Consumer Group A the message "A" from topic "X"

Publish message "B" to topic "X"
Consume by Consumer Group B the message "B" from topic "X"

How can achieve this in scala? Because both consumers will consume the
message even if it assigned a groupId.

Here is my producer code:

def main(args: Array[String]): Unit = {
  val zookeeperUrl: String = "localhost:2182"
  val kafkaServerUrl: String = "localhost:9092,localhost:9093"
  val topic: String = "Topic_X"
  val groupId: String = "Consumer_Group_A"
  val eventType: String = "delete"
  val deleteRetention: Int = 5240000
  val producerConfig =
ProducerService.createProducerConfig(zookeeperUrl, kafkaServerUrl,
topic, groupId, deleteRetention.toString)
  val producer = new KafkaProducer[String, String](producerConfig)
  val producerPayload = ProducerPayload("{\"batch_id\":\"" +
UUID.randomUUID().toString()
    + "\", \"document_id\":\"" + UUID.randomUUID().toString()
    + "\", \"type\":\"" + eventType
    + "\"}", "", topic, groupId, deleteRetention)
  ProducerService.sendMessage(producer, zookeeperUrl, kafkaServerUrl,
producerPayload)
  logger.info("Done.")
}


Here is my consumer code:

def consumeMessage = Action { implicit rs =>
  val zookeeperUrl: String = "localhost:2182"
  val kafkaServerUrl: String = "localhost:9092,localhost:9093"
  val topic: String = "Topic_X"
  val groupId: String = "Consumer_Group_A"
  val config = ConsumerService.createConsumerConfig(zookeeperUrl,
kafkaServerUrl, groupId)
  val consumer = kafka.consumer.Consumer.create(config)
  val consumerMap = consumer.createMessageStreams(Map(topic -> 1))
  val streams = consumerMap.get(topic).get
  val it = streams(0).iterator()
  while (it.hasNext()) {
    val msg = new String(it.next().message())
    logger.info(s"Message successfully consumed from topic ${topic} => " + msg)
  }
  consumer.shutdown()
  logger.info("Done.")
  Ok
}




Your help is much appreciated. Thank you*Sincerely yours,*


*Rico Nodalo Lugod*
Senior Java / J2EE / SOA - Developer
Cebu City, Philippines 6000

Email: rnl2004@gmail.com

Re: Multiple Consumer Group In Single Topic

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Rico,

Every consumer group will see all messages for the topics they are
subscribed to. If you want to filter these messages by consumer group,
you'd need to perform that filtering yourself after the messages are
returned by the consumer. To do so, you'd need to include enough
information in the messages to perform this filtering (e.g. you might
include metadata indicating which consumer group the message is intended
for, although if you are doing that it might be better to simply use
different topics for the two sets of messages).

-Ewen

On Fri, Dec 16, 2016 at 12:28 AM, Rico Lugod <rn...@gmail.com> wrote:

> Hi Guys,
>
> Good day!
>
> I have question regarding how to consume a specific message belongs to that
> consumer group?
>
> Here's the scenario:
>
> Publish message "A" to topic "X"
> Consume by Consumer Group A the message "A" from topic "X"
>
> Publish message "B" to topic "X"
> Consume by Consumer Group B the message "B" from topic "X"
>
> How can achieve this in scala? Because both consumers will consume the
> message even if it assigned a groupId.
>
> Here is my producer code:
>
> def main(args: Array[String]): Unit = {
>   val zookeeperUrl: String = "localhost:2182"
>   val kafkaServerUrl: String = "localhost:9092,localhost:9093"
>   val topic: String = "Topic_X"
>   val groupId: String = "Consumer_Group_A"
>   val eventType: String = "delete"
>   val deleteRetention: Int = 5240000
>   val producerConfig =
> ProducerService.createProducerConfig(zookeeperUrl, kafkaServerUrl,
> topic, groupId, deleteRetention.toString)
>   val producer = new KafkaProducer[String, String](producerConfig)
>   val producerPayload = ProducerPayload("{\"batch_id\":\"" +
> UUID.randomUUID().toString()
>     + "\", \"document_id\":\"" + UUID.randomUUID().toString()
>     + "\", \"type\":\"" + eventType
>     + "\"}", "", topic, groupId, deleteRetention)
>   ProducerService.sendMessage(producer, zookeeperUrl, kafkaServerUrl,
> producerPayload)
>   logger.info("Done.")
> }
>
>
> Here is my consumer code:
>
> def consumeMessage = Action { implicit rs =>
>   val zookeeperUrl: String = "localhost:2182"
>   val kafkaServerUrl: String = "localhost:9092,localhost:9093"
>   val topic: String = "Topic_X"
>   val groupId: String = "Consumer_Group_A"
>   val config = ConsumerService.createConsumerConfig(zookeeperUrl,
> kafkaServerUrl, groupId)
>   val consumer = kafka.consumer.Consumer.create(config)
>   val consumerMap = consumer.createMessageStreams(Map(topic -> 1))
>   val streams = consumerMap.get(topic).get
>   val it = streams(0).iterator()
>   while (it.hasNext()) {
>     val msg = new String(it.next().message())
>     logger.info(s"Message successfully consumed from topic ${topic} => "
> + msg)
>   }
>   consumer.shutdown()
>   logger.info("Done.")
>   Ok
> }
>
>
>
>
> Your help is much appreciated. Thank you*Sincerely yours,*
>
>
> *Rico Nodalo Lugod*
> Senior Java / J2EE / SOA - Developer
> Cebu City, Philippines 6000
>
> Email: rnl2004@gmail.com
>