You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Marcelo Oikawa <ma...@webradar.com> on 2017/05/17 20:38:11 UTC

Can't re-process topic

Hi, list.

I'm trying to re-process a topic in Kafka but when I request for earliest
offsets. The code below always returns the same value as latest offsets (if
I replace OffsetRequest.EarliestTime() to OffsetRequest.LatestTime()).

Is there something that I missing? I'm pretty sure that this code worked
for me at some point in our project. Today, we're using Kafka 0.10 but our
library is spark-streaming-kafka_2.10:1.6.3 and that depends on
kafka-clients:0.8.2.1. And also, our application is running on Spark 1.6.3.

Any thoughts are welcome.

// Get the partitions and offsets
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partMetadata.partitionId());
PartitionOffsetRequestInfo partitionRequestInfo = new
PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1);

Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
Collections.singletonMap(topicAndPartition, partitionRequestInfo);

OffsetRequest offsetRequest = new OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), "spark");
OffsetResponse offsetResponse =
getConsumer(partMetadata.leader().host(),
partMetadata.leader().port()).getOffsetsBefore(offsetRequest);

System.out.println(offsetResponse.hasError());
long[] offsets = offsetResponse.offsets(topic, partMetadata.partitionId());
System.out.println("offsets.size: " + offsets.length);
if (offsets.length > 0) {
    StringBuilder result = new StringBuilder();
    result.append("topic: ").append(topic).append("; ");
    result.append("partitionId:
").append(partMetadata.partitionId()).append("; ");
    result.append("offset: ").append(offsets[0]).append("; ");
    result.append("offsetSize: ").append(offsets.length).append(";");

    System.out.println(result.toString() + "\n");
}

Re: Can't re-process topic

Posted by João Peixoto <jo...@gmail.com>.
I'm not too familiar with Spark but the "earliest"/"latest" configuration
is only relevant if your consumer does not hold a valid offset.

If you read up to offset N, when you restart you'll start from N.
If you start a new consumer then it has no offset, that's when the above
configuration takes effect.

To reprocess a topic you need to set your consumer's offset to 0 or change
your consumer group name to a non-existing one. The former is preferable I
believe.


On Wed, May 17, 2017 at 1:38 PM Marcelo Oikawa <ma...@webradar.com>
wrote:

> Hi, list.
>
> I'm trying to re-process a topic in Kafka but when I request for earliest
> offsets. The code below always returns the same value as latest offsets (if
> I replace OffsetRequest.EarliestTime() to OffsetRequest.LatestTime()).
>
> Is there something that I missing? I'm pretty sure that this code worked
> for me at some point in our project. Today, we're using Kafka 0.10 but our
> library is spark-streaming-kafka_2.10:1.6.3 and that depends on
> kafka-clients:0.8.2.1. And also, our application is running on Spark 1.6.3.
>
> Any thoughts are welcome.
>
> // Get the partitions and offsets
> TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> partMetadata.partitionId());
> PartitionOffsetRequestInfo partitionRequestInfo = new
> PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1);
>
> Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
> Collections.singletonMap(topicAndPartition, partitionRequestInfo);
>
> OffsetRequest offsetRequest = new OffsetRequest(requestInfo,
> kafka.api.OffsetRequest.CurrentVersion(), "spark");
> OffsetResponse offsetResponse =
> getConsumer(partMetadata.leader().host(),
> partMetadata.leader().port()).getOffsetsBefore(offsetRequest);
>
> System.out.println(offsetResponse.hasError());
> long[] offsets = offsetResponse.offsets(topic, partMetadata.partitionId());
> System.out.println("offsets.size: " + offsets.length);
> if (offsets.length > 0) {
>     StringBuilder result = new StringBuilder();
>     result.append("topic: ").append(topic).append("; ");
>     result.append("partitionId:
> ").append(partMetadata.partitionId()).append("; ");
>     result.append("offset: ").append(offsets[0]).append("; ");
>     result.append("offsetSize: ").append(offsets.length).append(";");
>
>     System.out.println(result.toString() + "\n");
> }
>