You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vladimir Stoyak <vs...@yahoo.com> on 2015/12/03 16:01:21 UTC

Read Kafka topic from the beginning

I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely on Kafka topics set to "compact" retention for data persistence.

In our topology we wanted to set some topics with Log Compactions enabled and read topic from the beginning when the topology starts or component recovers. Does current Kafka Consumer implementation allow to read all messages in a topic from the beginning or from a specific offset.

Thanks,
Vladimir

Re: Read Kafka topic from the beginning

Posted by Robert Metzger <rm...@apache.org>.
Hi Vladimir,

Does current Kafka Consumer implementation allow to read all messages in a
> topic from the beginning or from a specific offset.


For reading from the beginning, setting "auto.offset.reset" to "smallest"
will do the job.
Reading from a specific offset is not yet supported yet, but it is very
easy to implement on top of: https://github.com/apache/flink/pull/1437,
that's why I filed a JIRA for this feature:
https://issues.apache.org/jira/browse/FLINK-3123
I'll implement it once the pull request is merged. Expect it in the course
of next week.


On Thu, Dec 3, 2015 at 11:55 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Vladimir!
>
> The Kafka Consumer can start from any offset internally (it does that for
> example when recovering a failure).
>
> Should be fairly straightforward to set that offset field initially from a
> parameter. The FlinkKafkaConsumer is part of the user jars anyways. If you
> want, you can give it a try to create a modified version that accepts that
> parameter, and then package that instead of the standard one.
>
> Greetings,
> Stephan
>
>
> On Thu, Dec 3, 2015 at 7:07 PM, Maximilian Michels <mx...@apache.org> wrote:
>
>> Hi Vladimir,
>>
>> Did you pass the properties to the FlinkKafkaConsumer?
>>
>> Cheers,
>> Max
>>
>> On Thu, Dec 3, 2015 at 7:06 PM, Vladimir Stoyak <vs...@yahoo.com>
>> wrote:
>> > Gave it a try, but does not seem to help. Is it working for you?
>> >
>> > Thanks
>> >
>> > Sent from my iPhone
>> >
>> >> On Dec 3, 2015, at 6:11 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
>> >>
>> >> As far as I know "auto.offset.reset" what to do if offset it not
>> available or out of bound?
>> >>
>> >> Vladimir
>> >>
>> >>
>> >> On Thursday, December 3, 2015 5:58 PM, Maximilian Michels <
>> mxm@apache.org> wrote:
>> >> Hi Vladimir,
>> >>
>> >> You may supply Kafka consumer properties when you create the
>> FlinkKafkaConsumer.
>> >>
>> >> Properties props = new Properties();
>> >>
>> >> // start from largest offset - DEFAULT
>> >> props.setProperty("auto.offset.reset", "largest");
>> >> // start from smallest offset
>> >> props.setProperty("auto.offset.reset", "smallest");
>> >>
>> >> I don't think it is possible to start from a specific offset. The
>> >> offset is only unique per partition. You could modify the offsets in
>> >> the Zookeeper state but you really have to know what you're doing
>> >> then.
>> >>
>> >> Best regards,
>> >> Max
>> >>
>> >>
>> >>
>> >>> On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak <vs...@yahoo.com>
>> wrote:
>> >>> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to
>> rely on Kafka topics set to "compact" retention for data persistence.
>> >>>
>> >>> In our topology we wanted to set some topics with Log Compactions
>> enabled and read topic from the beginning when the topology starts or
>> component recovers. Does current Kafka Consumer implementation allow to
>> read all messages in a topic from the beginning or from a specific offset.
>> >>>
>> >>> Thanks,
>> >>> Vladimir
>>
>
>

Re: Read Kafka topic from the beginning

Posted by Stephan Ewen <se...@apache.org>.
Hi Vladimir!

The Kafka Consumer can start from any offset internally (it does that for
example when recovering a failure).

Should be fairly straightforward to set that offset field initially from a
parameter. The FlinkKafkaConsumer is part of the user jars anyways. If you
want, you can give it a try to create a modified version that accepts that
parameter, and then package that instead of the standard one.

Greetings,
Stephan


On Thu, Dec 3, 2015 at 7:07 PM, Maximilian Michels <mx...@apache.org> wrote:

> Hi Vladimir,
>
> Did you pass the properties to the FlinkKafkaConsumer?
>
> Cheers,
> Max
>
> On Thu, Dec 3, 2015 at 7:06 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
> > Gave it a try, but does not seem to help. Is it working for you?
> >
> > Thanks
> >
> > Sent from my iPhone
> >
> >> On Dec 3, 2015, at 6:11 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
> >>
> >> As far as I know "auto.offset.reset" what to do if offset it not
> available or out of bound?
> >>
> >> Vladimir
> >>
> >>
> >> On Thursday, December 3, 2015 5:58 PM, Maximilian Michels <
> mxm@apache.org> wrote:
> >> Hi Vladimir,
> >>
> >> You may supply Kafka consumer properties when you create the
> FlinkKafkaConsumer.
> >>
> >> Properties props = new Properties();
> >>
> >> // start from largest offset - DEFAULT
> >> props.setProperty("auto.offset.reset", "largest");
> >> // start from smallest offset
> >> props.setProperty("auto.offset.reset", "smallest");
> >>
> >> I don't think it is possible to start from a specific offset. The
> >> offset is only unique per partition. You could modify the offsets in
> >> the Zookeeper state but you really have to know what you're doing
> >> then.
> >>
> >> Best regards,
> >> Max
> >>
> >>
> >>
> >>> On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak <vs...@yahoo.com>
> wrote:
> >>> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to
> rely on Kafka topics set to "compact" retention for data persistence.
> >>>
> >>> In our topology we wanted to set some topics with Log Compactions
> enabled and read topic from the beginning when the topology starts or
> component recovers. Does current Kafka Consumer implementation allow to
> read all messages in a topic from the beginning or from a specific offset.
> >>>
> >>> Thanks,
> >>> Vladimir
>

Re: Read Kafka topic from the beginning

Posted by Maximilian Michels <mx...@apache.org>.
Hi Vladimir,

Did you pass the properties to the FlinkKafkaConsumer?

Cheers,
Max

On Thu, Dec 3, 2015 at 7:06 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
> Gave it a try, but does not seem to help. Is it working for you?
>
> Thanks
>
> Sent from my iPhone
>
>> On Dec 3, 2015, at 6:11 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
>>
>> As far as I know "auto.offset.reset" what to do if offset it not available or out of bound?
>>
>> Vladimir
>>
>>
>> On Thursday, December 3, 2015 5:58 PM, Maximilian Michels <mx...@apache.org> wrote:
>> Hi Vladimir,
>>
>> You may supply Kafka consumer properties when you create the FlinkKafkaConsumer.
>>
>> Properties props = new Properties();
>>
>> // start from largest offset - DEFAULT
>> props.setProperty("auto.offset.reset", "largest");
>> // start from smallest offset
>> props.setProperty("auto.offset.reset", "smallest");
>>
>> I don't think it is possible to start from a specific offset. The
>> offset is only unique per partition. You could modify the offsets in
>> the Zookeeper state but you really have to know what you're doing
>> then.
>>
>> Best regards,
>> Max
>>
>>
>>
>>> On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
>>> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely on Kafka topics set to "compact" retention for data persistence.
>>>
>>> In our topology we wanted to set some topics with Log Compactions enabled and read topic from the beginning when the topology starts or component recovers. Does current Kafka Consumer implementation allow to read all messages in a topic from the beginning or from a specific offset.
>>>
>>> Thanks,
>>> Vladimir

Re: Read Kafka topic from the beginning

Posted by Vladimir Stoyak <vs...@yahoo.com>.
Gave it a try, but does not seem to help. Is it working for you?

Thanks

Sent from my iPhone

> On Dec 3, 2015, at 6:11 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
> 
> As far as I know "auto.offset.reset" what to do if offset it not available or out of bound?
> 
> Vladimir
> 
> 
> On Thursday, December 3, 2015 5:58 PM, Maximilian Michels <mx...@apache.org> wrote:
> Hi Vladimir,
> 
> You may supply Kafka consumer properties when you create the FlinkKafkaConsumer.
> 
> Properties props = new Properties();
> 
> // start from largest offset - DEFAULT
> props.setProperty("auto.offset.reset", "largest");
> // start from smallest offset
> props.setProperty("auto.offset.reset", "smallest");
> 
> I don't think it is possible to start from a specific offset. The
> offset is only unique per partition. You could modify the offsets in
> the Zookeeper state but you really have to know what you're doing
> then.
> 
> Best regards,
> Max
> 
> 
> 
>> On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
>> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely on Kafka topics set to "compact" retention for data persistence.
>> 
>> In our topology we wanted to set some topics with Log Compactions enabled and read topic from the beginning when the topology starts or component recovers. Does current Kafka Consumer implementation allow to read all messages in a topic from the beginning or from a specific offset.
>> 
>> Thanks,
>> Vladimir

Re: Read Kafka topic from the beginning

Posted by Vladimir Stoyak <vs...@yahoo.com>.
As far as I know "auto.offset.reset" what to do if offset it not available or out of bound?

Vladimir


On Thursday, December 3, 2015 5:58 PM, Maximilian Michels <mx...@apache.org> wrote:
Hi Vladimir,

You may supply Kafka consumer properties when you create the FlinkKafkaConsumer.

Properties props = new Properties();

// start from largest offset - DEFAULT
props.setProperty("auto.offset.reset", "largest");
// start from smallest offset
props.setProperty("auto.offset.reset", "smallest");

I don't think it is possible to start from a specific offset. The
offset is only unique per partition. You could modify the offsets in
the Zookeeper state but you really have to know what you're doing
then.

Best regards,
Max



On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely on Kafka topics set to "compact" retention for data persistence.
>
> In our topology we wanted to set some topics with Log Compactions enabled and read topic from the beginning when the topology starts or component recovers. Does current Kafka Consumer implementation allow to read all messages in a topic from the beginning or from a specific offset.
>
> Thanks,
> Vladimir

Re: Read Kafka topic from the beginning

Posted by Maximilian Michels <mx...@apache.org>.
Hi Vladimir,

You may supply Kafka consumer properties when you create the FlinkKafkaConsumer.

Properties props = new Properties();

// start from largest offset - DEFAULT
props.setProperty("auto.offset.reset", "largest");
// start from smallest offset
props.setProperty("auto.offset.reset", "smallest");

I don't think it is possible to start from a specific offset. The
offset is only unique per partition. You could modify the offsets in
the Zookeeper state but you really have to know what you're doing
then.

Best regards,
Max


On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak <vs...@yahoo.com> wrote:
> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely on Kafka topics set to "compact" retention for data persistence.
>
> In our topology we wanted to set some topics with Log Compactions enabled and read topic from the beginning when the topology starts or component recovers. Does current Kafka Consumer implementation allow to read all messages in a topic from the beginning or from a specific offset.
>
> Thanks,
> Vladimir