You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gary Struthers <ag...@earthlink.net> on 2016/01/12 00:24:02 UTC
Java client 0.9 poll doesn't return
Calling the stand alone client (org.apache.kafka" % "kafka-clients" % “0.9.0.0”) from Scala, consumer.poll never returns. I’ve tried both assign TopicPartition and subscribe and various timeouts and I’ve quintuple checked config properties. Here’s a Scala-Ide worksheet
val props = loadProperties(new StringBuilder("kafkaConsumer.properties"))
//> props : java.util.Properties = {key.deserializer=org.apache.kafka.common.se
//| rialization.StringDeserializer, auto.commit.interval.ms=1000, bootstrap.serv
//| ers=127.0.0.1:2181, enable.auto.commit=true, group.id=test, value.deserializ
//| er=org.apache.kafka.common.serialization.LongDeserializer, session.timeout.m
//| s=30000}
val topic = "debug-topic" //> topic : String = debug-topic
val topicList = List(topic).asJava //> topicList : java.util.List[String] = [debug-topic]
val consumer = new KafkaConsumer[String, String](props)
//> 15:07:22,501 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could
//| NOT find resource [logback.groovy]
//| 15:07:22,501 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could
//| NOT find resource [logback-test.xml]
//| 15:07:22,501 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found
//| resource [logback.xml] at [file:/Users/garystruthers/git/dendrites/target/s
//| cala-2.11/classes/logback.xml]
//| 15:07:22,502 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resou
//| rce [logback.xml] occurs multiple times on the classpath.
//| 15:07:22,502 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resou
//| rce [logback.xml] occurs at [file:/Users/garystruthers/git/dendrites/bin/log
//| back.xml]
//| 15:07:22,502 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resou
//| rce [logback.xml] occurs at [file:/Users/garystruthers/git/dendrites/target/
//| scala-2.11/classes/logback.xml]
//| 15:07:22,592 |-INFO in ch.qos.logback.
//| Output exceeds cutoff limit.
val tp0 = new TopicPartition(topic, 0) //> tp0 : org.apache.kafka.common.TopicPartition = debug-topic-0
val topicPartitions = List(tp0).asJava //> topicPartitions : java.util.List[org.apache.kafka.common.TopicPartition] =
//| [debug-topic-0]\
consumer.assign(topicPartitions)
//consumer.subscribe(topicList)
val records = consumer.poll(1000)
Gary
Re: Java client 0.9 poll doesn't return
Posted by Jason Gustafson <ja...@confluent.io>.
Looks like you might have bootstrap.servers pointed at Zookeeper. It should
point to the Kafka brokers instead. The behavior of poll() currently is to
block until the group's coordinator is found, but sending the wrong kind of
request to Zookeeper probably results in a server-side disconnect. In that
case, there is never an error to propagate and the consumer keeps on
trying.
-Jason
On Mon, Jan 11, 2016 at 3:24 PM, Gary Struthers <ag...@earthlink.net>
wrote:
> Calling the stand alone client (org.apache.kafka" % "kafka-clients" %
> “0.9.0.0”) from Scala, consumer.poll never returns. I’ve tried both assign
> TopicPartition and subscribe and various timeouts and I’ve quintuple
> checked config properties. Here’s a Scala-Ide worksheet
>
> val props = loadProperties(new StringBuilder("kafkaConsumer.properties"))
> //> props :
> java.util.Properties = {key.deserializer=org.apache.kafka.common.se
> //|
> rialization.StringDeserializer, auto.commit.interval.ms=1000,
> bootstrap.serv
> //| ers=127.0.0.1:2181,
> enable.auto.commit=true, group.id=test, value.deserializ
> //|
> er=org.apache.kafka.common.serialization.LongDeserializer, session.timeout.m
> //| s=30000}
> val topic = "debug-topic" //> topic : String =
> debug-topic
> val topicList = List(topic).asJava //> topicList :
> java.util.List[String] = [debug-topic]
> val consumer = new KafkaConsumer[String, String](props)
> //> 15:07:22,501 |-INFO
> in ch.qos.logback.classic.LoggerContext[default] - Could
> //| NOT find resource
> [logback.groovy]
> //| 15:07:22,501 |-INFO
> in ch.qos.logback.classic.LoggerContext[default] - Could
> //| NOT find resource
> [logback-test.xml]
> //| 15:07:22,501 |-INFO
> in ch.qos.logback.classic.LoggerContext[default] - Found
> //| resource
> [logback.xml] at [file:/Users/garystruthers/git/dendrites/target/s
> //|
> cala-2.11/classes/logback.xml]
> //| 15:07:22,502 |-WARN
> in ch.qos.logback.classic.LoggerContext[default] - Resou
> //| rce [logback.xml]
> occurs multiple times on the classpath.
> //| 15:07:22,502 |-WARN
> in ch.qos.logback.classic.LoggerContext[default] - Resou
> //| rce [logback.xml]
> occurs at [file:/Users/garystruthers/git/dendrites/bin/log
> //| back.xml]
> //| 15:07:22,502 |-WARN
> in ch.qos.logback.classic.LoggerContext[default] - Resou
> //| rce [logback.xml]
> occurs at [file:/Users/garystruthers/git/dendrites/target/
> //|
> scala-2.11/classes/logback.xml]
> //| 15:07:22,592 |-INFO
> in ch.qos.logback.
> //| Output exceeds
> cutoff limit.
> val tp0 = new TopicPartition(topic, 0) //> tp0 :
> org.apache.kafka.common.TopicPartition = debug-topic-0
> val topicPartitions = List(tp0).asJava //> topicPartitions :
> java.util.List[org.apache.kafka.common.TopicPartition] =
> //| [debug-topic-0]\
> consumer.assign(topicPartitions)
> //consumer.subscribe(topicList)
> val records = consumer.poll(1000)
>
> Gary