You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Krzysztof Zarzycki <k....@gmail.com> on 2016/01/19 09:03:21 UTC

Can't save Kafka offset in Zookeeper

Hi Kafka users,
I have an issue with saving Kafka offsets to Zookeeper through
OffsetCommitRequest. It's the same issue I found unanswered on SO, so I
kindly borrow the description:
http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper

"I've installed Zookeeper and Kafka from Ambari, on CentoS 7.

Ambari version: 2.1.2.1
Zookeeper version: 3.4.6.2.3
Kafka version: 0.8.2.2.3
Java Kafka client:kafka_2.10, 0.8.2.2

I'm trying to save the Kafka offset, using the following code:

SimpleConsumer simpleConsumer = new SimpleConsumer(host, port,
soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition =
new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition,
OffsetAndMetadata> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset,
"", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest =
new OffsetCommitRequest(groupName, requestInfo, correlationId,
clientName, (short)0);
simpleConsumer.commitOffsets(offsetCommitRequest);
simpleConsumer.close();

But when I run this, I get the following error in my client:

java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.

Also in the Kafka logs I have the following error:

[2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1
because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:498)
    at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
    at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
    at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
    at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
    at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
    at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55)
    at kafka.network.Processor.read(SocketServer.scala:547)
    at kafka.network.Processor.run(SocketServer.scala:405)
    at java.lang.Thread.run(Thread.java:745)

Now I've also downloaded and installed the official Kafka 0.8.2.2 version
from
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
and
it works ok; you can save the Kafka offset without any error.

Can anybody give me a some directions, why is the Ambari Kafka failing to
save the offset?

P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the
offset is actually saved in Zookeeper.
"
My only difference (IMHO, irrelevant)  is that I'm using HDP in version
2.3.2, but other than that versions are the same.

Do you guys have any hints on what could be wrong? Is that something wrong
with my use of offset committing? Or conflict of versions?
Any hints would be highly appreciated :)
Cheers,
Krzysztof

Re: Can't save Kafka offset in Zookeeper

Posted by Dana Powers <da...@gmail.com>.
0.9 brokers should be backwards compatible, yes. But as with everything --
you should verify in your own environment.

-Dana

On Tue, Jan 19, 2016 at 9:55 AM, Krzysztof Zarzycki <k....@gmail.com>
wrote:

> Thank you Dana!
> I see...
> The pity is that Hortonworks claims in their release notes of HDP 2.3.2,
> that:
>  5.9. Kafka
>
> HDP 2.3.2 provides Kafka 0.8.2, with no additional Apache patches.
> (
>
> https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_HDP_RelNotes/content/patch_kafka.html
>  )
>
> So I assumed that Kafka would come in stable release...
>
> So you say, that upgrading to HDP 2.3.4 would help? I see in release notes,
> that it is going to upgrade Kafka to 0.9.0.
> I'm affraid of this upgrade as I don't know whether Spark Streaming
> (spark-streaming-kafka) will support Kafka in 0.9.
>
> What do you think? Is Kafka 0.9 completely backward compatible? I.e.
> clients(both producers & consumers) using libraries for 0.8.2 (both
> "kafka-clients" as well as straight "kafka")  connecting to it will work
> after upgrade?
>
> Thanks for your answer,
> Krzysztof
>
>
>
>
> wt., 19.01.2016 o 18:39 użytkownik Dana Powers <da...@gmail.com>
> napisał:
>
> > Sadly HDP 2.3.2 shipped with a broken OffsetCommit api (the 0.8.2-beta
> > version). You should use the apache releases, or upgrade to HDP 2.3.4.0
> or
> > later.
> >
> > -Dana
> >
> > On Tue, Jan 19, 2016 at 12:03 AM, Krzysztof Zarzycki <
> k.zarzycki@gmail.com
> > >
> > wrote:
> >
> > > Hi Kafka users,
> > > I have an issue with saving Kafka offsets to Zookeeper through
> > > OffsetCommitRequest. It's the same issue I found unanswered on SO, so I
> > > kindly borrow the description:
> > >
> > >
> >
> http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper
> > >
> > > "I've installed Zookeeper and Kafka from Ambari, on CentoS 7.
> > >
> > > Ambari version: 2.1.2.1
> > > Zookeeper version: 3.4.6.2.3
> > > Kafka version: 0.8.2.2.3
> > > Java Kafka client:kafka_2.10, 0.8.2.2
> > >
> > > I'm trying to save the Kafka offset, using the following code:
> > >
> > > SimpleConsumer simpleConsumer = new SimpleConsumer(host, port,
> > > soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition =
> > > new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition,
> > > OffsetAndMetadata> requestInfo = new HashMap<>();
> > > requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset,
> > > "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest =
> > > new OffsetCommitRequest(groupName, requestInfo, correlationId,
> > > clientName, (short)0);
> > > simpleConsumer.commitOffsets(offsetCommitRequest);
> > > simpleConsumer.close();
> > >
> > > But when I run this, I get the following error in my client:
> > >
> > > java.io.EOFException: Received -1 when reading from channel, socket
> > > has likely been closed.
> > >
> > > Also in the Kafka logs I have the following error:
> > >
> > > [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1
> > > because of error (kafka.network.Processor)
> > > java.nio.BufferUnderflowException
> > >     at java.nio.Buffer.nextGetIndex(Buffer.java:498)
> > >     at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
> > >     at
> > >
> >
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
> > >     at
> > >
> >
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
> > >     at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >     at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >     at scala.collection.immutable.Range.foreach(Range.scala:141)
> > >     at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > >     at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> > >     at
> > >
> >
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
> > >     at
> > >
> >
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
> > >     at
> > >
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> > >     at
> > >
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> > >     at scala.collection.immutable.Range.foreach(Range.scala:141)
> > >     at
> > >
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> > >     at
> > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> > >     at
> > > kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
> > >     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
> > >     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
> > >     at
> > kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55)
> > >     at kafka.network.Processor.read(SocketServer.scala:547)
> > >     at kafka.network.Processor.run(SocketServer.scala:405)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > > Now I've also downloaded and installed the official Kafka 0.8.2.2
> version
> > > from
> > >
> > >
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
> > > and
> > > it works ok; you can save the Kafka offset without any error.
> > >
> > > Can anybody give me a some directions, why is the Ambari Kafka failing
> to
> > > save the offset?
> > >
> > > P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the
> > > offset is actually saved in Zookeeper.
> > > "
> > > My only difference (IMHO, irrelevant)  is that I'm using HDP in version
> > > 2.3.2, but other than that versions are the same.
> > >
> > > Do you guys have any hints on what could be wrong? Is that something
> > wrong
> > > with my use of offset committing? Or conflict of versions?
> > > Any hints would be highly appreciated :)
> > > Cheers,
> > > Krzysztof
> > >
> >
>

Re: Can't save Kafka offset in Zookeeper

Posted by Krzysztof Zarzycki <k....@gmail.com>.
Thank you Dana!
I see...
The pity is that Hortonworks claims in their release notes of HDP 2.3.2,
that:
 5.9. Kafka

HDP 2.3.2 provides Kafka 0.8.2, with no additional Apache patches.
(
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_HDP_RelNotes/content/patch_kafka.html
 )

So I assumed that Kafka would come in stable release...

So you say, that upgrading to HDP 2.3.4 would help? I see in release notes,
that it is going to upgrade Kafka to 0.9.0.
I'm affraid of this upgrade as I don't know whether Spark Streaming
(spark-streaming-kafka) will support Kafka in 0.9.

What do you think? Is Kafka 0.9 completely backward compatible? I.e.
clients(both producers & consumers) using libraries for 0.8.2 (both
"kafka-clients" as well as straight "kafka")  connecting to it will work
after upgrade?

Thanks for your answer,
Krzysztof




wt., 19.01.2016 o 18:39 użytkownik Dana Powers <da...@gmail.com>
napisał:

> Sadly HDP 2.3.2 shipped with a broken OffsetCommit api (the 0.8.2-beta
> version). You should use the apache releases, or upgrade to HDP 2.3.4.0 or
> later.
>
> -Dana
>
> On Tue, Jan 19, 2016 at 12:03 AM, Krzysztof Zarzycki <k.zarzycki@gmail.com
> >
> wrote:
>
> > Hi Kafka users,
> > I have an issue with saving Kafka offsets to Zookeeper through
> > OffsetCommitRequest. It's the same issue I found unanswered on SO, so I
> > kindly borrow the description:
> >
> >
> http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper
> >
> > "I've installed Zookeeper and Kafka from Ambari, on CentoS 7.
> >
> > Ambari version: 2.1.2.1
> > Zookeeper version: 3.4.6.2.3
> > Kafka version: 0.8.2.2.3
> > Java Kafka client:kafka_2.10, 0.8.2.2
> >
> > I'm trying to save the Kafka offset, using the following code:
> >
> > SimpleConsumer simpleConsumer = new SimpleConsumer(host, port,
> > soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition =
> > new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition,
> > OffsetAndMetadata> requestInfo = new HashMap<>();
> > requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset,
> > "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest =
> > new OffsetCommitRequest(groupName, requestInfo, correlationId,
> > clientName, (short)0);
> > simpleConsumer.commitOffsets(offsetCommitRequest);
> > simpleConsumer.close();
> >
> > But when I run this, I get the following error in my client:
> >
> > java.io.EOFException: Received -1 when reading from channel, socket
> > has likely been closed.
> >
> > Also in the Kafka logs I have the following error:
> >
> > [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1
> > because of error (kafka.network.Processor)
> > java.nio.BufferUnderflowException
> >     at java.nio.Buffer.nextGetIndex(Buffer.java:498)
> >     at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
> >     at
> >
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
> >     at
> >
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at scala.collection.immutable.Range.foreach(Range.scala:141)
> >     at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >     at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> >     at
> >
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
> >     at
> >
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >     at scala.collection.immutable.Range.foreach(Range.scala:141)
> >     at
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> >     at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> >     at
> > kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
> >     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
> >     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
> >     at
> kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55)
> >     at kafka.network.Processor.read(SocketServer.scala:547)
> >     at kafka.network.Processor.run(SocketServer.scala:405)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> > Now I've also downloaded and installed the official Kafka 0.8.2.2 version
> > from
> >
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
> > and
> > it works ok; you can save the Kafka offset without any error.
> >
> > Can anybody give me a some directions, why is the Ambari Kafka failing to
> > save the offset?
> >
> > P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the
> > offset is actually saved in Zookeeper.
> > "
> > My only difference (IMHO, irrelevant)  is that I'm using HDP in version
> > 2.3.2, but other than that versions are the same.
> >
> > Do you guys have any hints on what could be wrong? Is that something
> wrong
> > with my use of offset committing? Or conflict of versions?
> > Any hints would be highly appreciated :)
> > Cheers,
> > Krzysztof
> >
>

Re: Can't save Kafka offset in Zookeeper

Posted by Dana Powers <da...@gmail.com>.
Sadly HDP 2.3.2 shipped with a broken OffsetCommit api (the 0.8.2-beta
version). You should use the apache releases, or upgrade to HDP 2.3.4.0 or
later.

-Dana

On Tue, Jan 19, 2016 at 12:03 AM, Krzysztof Zarzycki <k....@gmail.com>
wrote:

> Hi Kafka users,
> I have an issue with saving Kafka offsets to Zookeeper through
> OffsetCommitRequest. It's the same issue I found unanswered on SO, so I
> kindly borrow the description:
>
> http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper
>
> "I've installed Zookeeper and Kafka from Ambari, on CentoS 7.
>
> Ambari version: 2.1.2.1
> Zookeeper version: 3.4.6.2.3
> Kafka version: 0.8.2.2.3
> Java Kafka client:kafka_2.10, 0.8.2.2
>
> I'm trying to save the Kafka offset, using the following code:
>
> SimpleConsumer simpleConsumer = new SimpleConsumer(host, port,
> soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition =
> new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition,
> OffsetAndMetadata> requestInfo = new HashMap<>();
> requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset,
> "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest =
> new OffsetCommitRequest(groupName, requestInfo, correlationId,
> clientName, (short)0);
> simpleConsumer.commitOffsets(offsetCommitRequest);
> simpleConsumer.close();
>
> But when I run this, I get the following error in my client:
>
> java.io.EOFException: Received -1 when reading from channel, socket
> has likely been closed.
>
> Also in the Kafka logs I have the following error:
>
> [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1
> because of error (kafka.network.Processor)
> java.nio.BufferUnderflowException
>     at java.nio.Buffer.nextGetIndex(Buffer.java:498)
>     at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
>     at
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
>     at
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at scala.collection.immutable.Range.foreach(Range.scala:141)
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>     at
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
>     at
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at scala.collection.immutable.Range.foreach(Range.scala:141)
>     at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>     at
> kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
>     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
>     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
>     at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55)
>     at kafka.network.Processor.read(SocketServer.scala:547)
>     at kafka.network.Processor.run(SocketServer.scala:405)
>     at java.lang.Thread.run(Thread.java:745)
>
> Now I've also downloaded and installed the official Kafka 0.8.2.2 version
> from
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
> and
> it works ok; you can save the Kafka offset without any error.
>
> Can anybody give me a some directions, why is the Ambari Kafka failing to
> save the offset?
>
> P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the
> offset is actually saved in Zookeeper.
> "
> My only difference (IMHO, irrelevant)  is that I'm using HDP in version
> 2.3.2, but other than that versions are the same.
>
> Do you guys have any hints on what could be wrong? Is that something wrong
> with my use of offset committing? Or conflict of versions?
> Any hints would be highly appreciated :)
> Cheers,
> Krzysztof
>