You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Vahid S Hashemian <va...@us.ibm.com> on 2016/05/09 22:09:41 UTC
Java Consumer Issue
I am trying to create a very simple (new) consumer in Java using the
trunk:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group1");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new
KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList("t1"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record
: records)
{
System.out.println(String.format(
"topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(),
record.partition(), record.offset(), record.key(), record.value()));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
This code raises the following exception when polling:
org.apache.kafka.common.protocol.types.SchemaException: Error reading
field 'topic_metadata': Error reading array of size 160817, only 30 bytes
available
at org.apache.kafka.common.protocol.types.Schema.read(
Schema.java:73)
at org.apache.kafka.clients.NetworkClient.parseResponse(
NetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(
NetworkClient.java:269)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
ConsumerNetworkClient.java:360)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
ConsumerNetworkClient.java:224)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
ConsumerNetworkClient.java:192)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
ConsumerNetworkClient.java:163)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(
AbstractCoordinator.java:179)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
KafkaConsumer.java:973)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
KafkaConsumer.java:937)
at NewConsumerLoop.main(NewConsumerLoop.java:97)
Can anyone spot what the issue is? Thanks.
Regards,
--Vahid Hashemian
Re: Java Consumer Issue
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Thanks Ismael.
I had the same suspicion and had run the build (./gradlew jar) as you
suggested.
But unfortunately, that didn't resolve the issue.
Regards,
--Vahid Hashemian
From: Ismael Juma <is...@juma.me.uk>
To: dev@kafka.apache.org
Date: 05/09/2016 05:05 PM
Subject: Re: Java Consumer Issue
Sent by: ismaelj@gmail.com
Vahid, it looks like the broker is old and it doesn't include the metadata
request/response changes. This could happen if you haven't done ./gradlew
jar in a while.
Ismael
On Mon, May 9, 2016 at 11:32 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> Hi Ismael,
>
> According to the server logs
> INFO Kafka version : 0.10.0.0-SNAPSHOT
> (org.apache.kafka.common.utils.AppInfoParser)
>
> I am running my server in a command line session, and my consumer in my
> IDE (eclipse). That's when I get the exception.
> However, I just noticed when I also run the server in my IDE, I don't
get
> this exception.
>
> Regards,
> --Vahid Hashemian
>
>
>
>
> From: Ismael Juma <is...@juma.me.uk>
> To: dev@kafka.apache.org
> Date: 05/09/2016 03:14 PM
> Subject: Re: Java Consumer Issue
> Sent by: ismaelj@gmail.com
>
>
>
> Hi Vahid,
>
> What is the broker version?
>
> Ismael
>
> On Mon, May 9, 2016 at 11:09 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > I am trying to create a very simple (new) consumer in Java using the
> > trunk:
> >
> > Properties props = new Properties();
> > props.put("bootstrap.servers", "localhost:9092");
> > props.put("group.id", "group1");
> > props.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> > props.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> > KafkaConsumer<String, String> consumer = new
> > KafkaConsumer<String, String>(props);
> > consumer.subscribe(Collections.singletonList("t1"));
> >
> > try {
> > while (true) {
> > ConsumerRecords<String, String>
records
> =
> > consumer.poll(100);
> > for (ConsumerRecord<String, String>
> record
> > : records)
> > {
> > System.out.println(String.format(
> > "topic = %s, partition = %s, offset = %d, customer = %s, country =
> %s\n",
> > record.topic(),
> > record.partition(), record.offset(), record.key(), record.value()));
> > }
> > }
> > } catch (Exception e) {
> > e.printStackTrace();
> > } finally {
> > consumer.close();
> > }
> >
> > This code raises the following exception when polling:
> >
> > org.apache.kafka.common.protocol.types.SchemaException: Error reading
> > field 'topic_metadata': Error reading array of size 160817, only 30
> bytes
> > available
> > at org.apache.kafka.common.protocol.types.Schema.read(
> > Schema.java:73)
> > at org.apache.kafka.clients.NetworkClient.parseResponse(
> > NetworkClient.java:380)
> > at
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> > NetworkClient.java:449)
> > at org.apache.kafka.clients.NetworkClient.poll(
> > NetworkClient.java:269)
> > at
> >
> >
>
>
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
> > ConsumerNetworkClient.java:360)
> > at
> >
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > ConsumerNetworkClient.java:224)
> > at
> >
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > ConsumerNetworkClient.java:192)
> > at
> >
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > ConsumerNetworkClient.java:163)
> > at
> >
> >
>
>
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(
> > AbstractCoordinator.java:179)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> > KafkaConsumer.java:973)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:937)
> > at NewConsumerLoop.main(NewConsumerLoop.java:97)
> >
> >
> > Can anyone spot what the issue is? Thanks.
> >
> > Regards,
> > --Vahid Hashemian
> >
> >
> >
>
>
>
>
>
Re: Java Consumer Issue
Posted by Ismael Juma <is...@juma.me.uk>.
Vahid, it looks like the broker is old and it doesn't include the metadata
request/response changes. This could happen if you haven't done ./gradlew
jar in a while.
Ismael
On Mon, May 9, 2016 at 11:32 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> Hi Ismael,
>
> According to the server logs
> INFO Kafka version : 0.10.0.0-SNAPSHOT
> (org.apache.kafka.common.utils.AppInfoParser)
>
> I am running my server in a command line session, and my consumer in my
> IDE (eclipse). That's when I get the exception.
> However, I just noticed when I also run the server in my IDE, I don't get
> this exception.
>
> Regards,
> --Vahid Hashemian
>
>
>
>
> From: Ismael Juma <is...@juma.me.uk>
> To: dev@kafka.apache.org
> Date: 05/09/2016 03:14 PM
> Subject: Re: Java Consumer Issue
> Sent by: ismaelj@gmail.com
>
>
>
> Hi Vahid,
>
> What is the broker version?
>
> Ismael
>
> On Mon, May 9, 2016 at 11:09 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > I am trying to create a very simple (new) consumer in Java using the
> > trunk:
> >
> > Properties props = new Properties();
> > props.put("bootstrap.servers", "localhost:9092");
> > props.put("group.id", "group1");
> > props.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> > props.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> > KafkaConsumer<String, String> consumer = new
> > KafkaConsumer<String, String>(props);
> > consumer.subscribe(Collections.singletonList("t1"));
> >
> > try {
> > while (true) {
> > ConsumerRecords<String, String> records
> =
> > consumer.poll(100);
> > for (ConsumerRecord<String, String>
> record
> > : records)
> > {
> > System.out.println(String.format(
> > "topic = %s, partition = %s, offset = %d, customer = %s, country =
> %s\n",
> > record.topic(),
> > record.partition(), record.offset(), record.key(), record.value()));
> > }
> > }
> > } catch (Exception e) {
> > e.printStackTrace();
> > } finally {
> > consumer.close();
> > }
> >
> > This code raises the following exception when polling:
> >
> > org.apache.kafka.common.protocol.types.SchemaException: Error reading
> > field 'topic_metadata': Error reading array of size 160817, only 30
> bytes
> > available
> > at org.apache.kafka.common.protocol.types.Schema.read(
> > Schema.java:73)
> > at org.apache.kafka.clients.NetworkClient.parseResponse(
> > NetworkClient.java:380)
> > at
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> > NetworkClient.java:449)
> > at org.apache.kafka.clients.NetworkClient.poll(
> > NetworkClient.java:269)
> > at
> >
> >
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
> > ConsumerNetworkClient.java:360)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > ConsumerNetworkClient.java:224)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > ConsumerNetworkClient.java:192)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > ConsumerNetworkClient.java:163)
> > at
> >
> >
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(
> > AbstractCoordinator.java:179)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> > KafkaConsumer.java:973)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:937)
> > at NewConsumerLoop.main(NewConsumerLoop.java:97)
> >
> >
> > Can anyone spot what the issue is? Thanks.
> >
> > Regards,
> > --Vahid Hashemian
> >
> >
> >
>
>
>
>
>
Re: Java Consumer Issue
Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Ismael,
According to the server logs
INFO Kafka version : 0.10.0.0-SNAPSHOT
(org.apache.kafka.common.utils.AppInfoParser)
I am running my server in a command line session, and my consumer in my
IDE (eclipse). That's when I get the exception.
However, I just noticed when I also run the server in my IDE, I don't get
this exception.
Regards,
--Vahid Hashemian
From: Ismael Juma <is...@juma.me.uk>
To: dev@kafka.apache.org
Date: 05/09/2016 03:14 PM
Subject: Re: Java Consumer Issue
Sent by: ismaelj@gmail.com
Hi Vahid,
What is the broker version?
Ismael
On Mon, May 9, 2016 at 11:09 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> I am trying to create a very simple (new) consumer in Java using the
> trunk:
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "group1");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> KafkaConsumer<String, String> consumer = new
> KafkaConsumer<String, String>(props);
> consumer.subscribe(Collections.singletonList("t1"));
>
> try {
> while (true) {
> ConsumerRecords<String, String> records
=
> consumer.poll(100);
> for (ConsumerRecord<String, String>
record
> : records)
> {
> System.out.println(String.format(
> "topic = %s, partition = %s, offset = %d, customer = %s, country =
%s\n",
> record.topic(),
> record.partition(), record.offset(), record.key(), record.value()));
> }
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> consumer.close();
> }
>
> This code raises the following exception when polling:
>
> org.apache.kafka.common.protocol.types.SchemaException: Error reading
> field 'topic_metadata': Error reading array of size 160817, only 30
bytes
> available
> at org.apache.kafka.common.protocol.types.Schema.read(
> Schema.java:73)
> at org.apache.kafka.clients.NetworkClient.parseResponse(
> NetworkClient.java:380)
> at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> NetworkClient.java:449)
> at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:269)
> at
>
>
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
> ConsumerNetworkClient.java:360)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:224)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:192)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:163)
> at
>
>
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(
> AbstractCoordinator.java:179)
> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> KafkaConsumer.java:973)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:937)
> at NewConsumerLoop.main(NewConsumerLoop.java:97)
>
>
> Can anyone spot what the issue is? Thanks.
>
> Regards,
> --Vahid Hashemian
>
>
>
Re: Java Consumer Issue
Posted by Ismael Juma <is...@juma.me.uk>.
Hi Vahid,
What is the broker version?
Ismael
On Mon, May 9, 2016 at 11:09 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:
> I am trying to create a very simple (new) consumer in Java using the
> trunk:
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "group1");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> KafkaConsumer<String, String> consumer = new
> KafkaConsumer<String, String>(props);
> consumer.subscribe(Collections.singletonList("t1"));
>
> try {
> while (true) {
> ConsumerRecords<String, String> records =
> consumer.poll(100);
> for (ConsumerRecord<String, String> record
> : records)
> {
> System.out.println(String.format(
> "topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
> record.topic(),
> record.partition(), record.offset(), record.key(), record.value()));
> }
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> consumer.close();
> }
>
> This code raises the following exception when polling:
>
> org.apache.kafka.common.protocol.types.SchemaException: Error reading
> field 'topic_metadata': Error reading array of size 160817, only 30 bytes
> available
> at org.apache.kafka.common.protocol.types.Schema.read(
> Schema.java:73)
> at org.apache.kafka.clients.NetworkClient.parseResponse(
> NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> NetworkClient.java:449)
> at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:269)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
> ConsumerNetworkClient.java:360)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:224)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:192)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:163)
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(
> AbstractCoordinator.java:179)
> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> KafkaConsumer.java:973)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:937)
> at NewConsumerLoop.main(NewConsumerLoop.java:97)
>
>
> Can anyone spot what the issue is? Thanks.
>
> Regards,
> --Vahid Hashemian
>
>
>