You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Serega Sheypak <se...@gmail.com> on 2017/04/18 08:30:13 UTC

Consuming from Kafka 8.2.2 from Actor. Can't reuse consumer instance

Hi, I'm on 8.2.2. and can't use recent Consumer API described in "Kafka
definitive guide". My version of kafka-clients has stubs for consumer
methods.
This is why I'm using "old" high level consumer API and it doesn't work
well for me.
I have single actor that tries to read messages from single kafka topic
with single kafka partition

Here is my code:

def consume(numberOfEvents: Int, await: Duration = 100.millis):
List[MessageEnvelope] = {
    val consumerProperties = new Properties()
    consumerProperties.put("zookeeper.connect",
kafkaConfig.zooKeeperConnectString)
    consumerProperties.put("group.id", consumerGroup)
    consumerProperties.put("auto.offset.reset", "smallest")

    val consumer = Consumer.create(new ConsumerConfig(consumerProperties))

    try {
      val messageStreams = consumer.createMessageStreams(
        Predef.Map(kafkaConfig.topic -> 1),
        new DefaultDecoder,
        new MessageEnvelopeDecoder)

      val receiveMessageFuture = Future[List[MessageEnvelope]] {
        messageStreams(kafkaConfig.topic)
          .flatMap(stream => stream.take(numberOfEvents).map(_.message()))
      }

      Await.result(receiveMessageFuture, await)
    } finally {
      consumer.shutdown()
    }


It works fine, but I suppose I should reuse consumerConnector instance

val consumer = Consumer.create(new ConsumerConfig(consumerProperties))

I shouldn't create it for each "message poll"
I tried to have single instance of ConsumerConnector and messageStreams for
my singleton consumer actor.
It didn't go well. Exception is thrown

2017-04-17_20:02:44.236 WARN  MessageEnvelopeConsumer - Error while
consuming messages
kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector
can create message streams at most once
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151)
    at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47)

at this line:

messageStreams(kafkaConfig.topic)
          .flatMap(stream => stream.take(numberOfEvents).map(_.message()))

Then I tried to reuse only consumer and create messageStream each time I
poll messages.

Didn't go well, exception is:

2017-04-17_20:02:44.236 WARN  MessageEnvelopeConsumer - Error while
consuming messages
kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector
can create message streams at most once
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151)
    at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47)

Exception is obvious to me, but I don't create two consumer instances. I
have loggers, counters in my test and I'm 100% I do not call

 Consumer.create(new ConsumerConfig(consumerProperties))

 twice during test