You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Serega Sheypak <se...@gmail.com> on 2017/04/18 08:30:13 UTC
Consuming from Kafka 8.2.2 from Actor. Can't reuse consumer instance
Hi, I'm on 8.2.2. and can't use recent Consumer API described in "Kafka
definitive guide". My version of kafka-clients has stubs for consumer
methods.
This is why I'm using "old" high level consumer API and it doesn't work
well for me.
I have single actor that tries to read messages from single kafka topic
with single kafka partition
Here is my code:
def consume(numberOfEvents: Int, await: Duration = 100.millis):
List[MessageEnvelope] = {
val consumerProperties = new Properties()
consumerProperties.put("zookeeper.connect",
kafkaConfig.zooKeeperConnectString)
consumerProperties.put("group.id", consumerGroup)
consumerProperties.put("auto.offset.reset", "smallest")
val consumer = Consumer.create(new ConsumerConfig(consumerProperties))
try {
val messageStreams = consumer.createMessageStreams(
Predef.Map(kafkaConfig.topic -> 1),
new DefaultDecoder,
new MessageEnvelopeDecoder)
val receiveMessageFuture = Future[List[MessageEnvelope]] {
messageStreams(kafkaConfig.topic)
.flatMap(stream => stream.take(numberOfEvents).map(_.message()))
}
Await.result(receiveMessageFuture, await)
} finally {
consumer.shutdown()
}
It works fine, but I suppose I should reuse consumerConnector instance
val consumer = Consumer.create(new ConsumerConfig(consumerProperties))
I shouldn't create it for each "message poll"
I tried to have single instance of ConsumerConnector and messageStreams for
my singleton consumer actor.
It didn't go well. Exception is thrown
2017-04-17_20:02:44.236 WARN MessageEnvelopeConsumer - Error while
consuming messages
kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector
can create message streams at most once
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151)
at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47)
at this line:
messageStreams(kafkaConfig.topic)
.flatMap(stream => stream.take(numberOfEvents).map(_.message()))
Then I tried to reuse only consumer and create messageStream each time I
poll messages.
Didn't go well, exception is:
2017-04-17_20:02:44.236 WARN MessageEnvelopeConsumer - Error while
consuming messages
kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector
can create message streams at most once
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151)
at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47)
Exception is obvious to me, but I don't create two consumer instances. I
have loggers, counters in my test and I'm 100% I do not call
Consumer.create(new ConsumerConfig(consumerProperties))
twice during test