You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by ajeet singh <aj...@gmail.com> on 2015/01/27 12:31:59 UTC

CommitOffsets for Simple consumer is returning EOF Exception

Hi ,

I am new to Kafka, I have a use case in which My Consumer can't use auto
commit offset feature, I have to go with option of manual Commit. With High
level Consumer I have have constrain that consumer can commit only current
offset, but in my case I will be committing some previous off-set value.

So only possible solution seems like to use Simple Consumer.  This is how I
am using Simple Consumer for Commit offset :

TopicAndPartition topicAndPartition = new
TopicAndPartition(topic,partition);
Map<TopicAndPartition, OffsetMetadataAndError> requestInfo = new
HashMap<TopicAndPartition, OffsetMetadataAndError>();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(0L,"no_metadata", (short) 0));
kafka.javaapi.OffsetCommitRequest request = new
kafka.javaapi.OffsetCommitRequest("test",
requestInfo1,kafka.api.OffsetRequest.CurrentVersion(), 0, clientName);
kafka.javaapi.OffsetCommitResponse response =
consumer.commitOffsets(request);


I am getting EOFException

Oops:java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:138)
at
kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99)
at
com.vizury.rtb.realtimelogging.OfflineLogConsumer.commitOffsetTest(OfflineLogConsumer.java:205)
at
com.vizury.rtb.realtimelogging.OfflineLogConsumer.run(OfflineLogConsumer.java:147)
at
com.vizury.rtb.realtimelogging.OfflineLogConsumer.main(OfflineLogConsumer.java:31)


Any help ?? same error I am getting with fetchOffsets() method, where as
 getOffsetsBefore() is working fine.

Re: CommitOffsets for Simple consumer is returning EOF Exception

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Ajeet,

Which version of Kafka are you using? I remember the OffsetCommitRequest's
requestInfo should be a map of topicPartition -> OffsetAndMetadata, not
OffsetMetadataAndError.

Guozhang

On Tue, Jan 27, 2015 at 3:31 AM, ajeet singh <aj...@gmail.com>
wrote:

> Hi ,
>
> I am new to Kafka, I have a use case in which My Consumer can't use auto
> commit offset feature, I have to go with option of manual Commit. With High
> level Consumer I have have constrain that consumer can commit only current
> offset, but in my case I will be committing some previous off-set value.
>
> So only possible solution seems like to use Simple Consumer.  This is how I
> am using Simple Consumer for Commit offset :
>
> TopicAndPartition topicAndPartition = new
> TopicAndPartition(topic,partition);
> Map<TopicAndPartition, OffsetMetadataAndError> requestInfo = new
> HashMap<TopicAndPartition, OffsetMetadataAndError>();
> requestInfo.put(topicAndPartition, new
> OffsetMetadataAndError(0L,"no_metadata", (short) 0));
> kafka.javaapi.OffsetCommitRequest request = new
> kafka.javaapi.OffsetCommitRequest("test",
> requestInfo1,kafka.api.OffsetRequest.CurrentVersion(), 0, clientName);
> kafka.javaapi.OffsetCommitResponse response =
> consumer.commitOffsets(request);
>
>
> I am getting EOFException
>
> Oops:java.io.EOFException: Received -1 when reading from channel, socket
> has likely been closed.
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:376)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:138)
> at
> kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99)
> at
>
> com.vizury.rtb.realtimelogging.OfflineLogConsumer.commitOffsetTest(OfflineLogConsumer.java:205)
> at
>
> com.vizury.rtb.realtimelogging.OfflineLogConsumer.run(OfflineLogConsumer.java:147)
> at
>
> com.vizury.rtb.realtimelogging.OfflineLogConsumer.main(OfflineLogConsumer.java:31)
>
>
> Any help ?? same error I am getting with fetchOffsets() method, where as
>  getOffsetsBefore() is working fine.
>



-- 
-- Guozhang