You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shikhar <sh...@schmizz.net> on 2016/02/08 20:57:17 UTC

Kafka partition alignment for event time

My Flink job is doing aggregations on top of event-time based windowing
across Kafka partitions. As I have been developing and restarting it, the
state for the catch-up periods becomes unreliable -- lots of duplicate emits
for time windows already seen before, that I have to discard since my sink
can't handle it. There may be a bug in my job, but I wanted to clarify
whether this might be a flaw in Flink's handling of this.

I understand there is m:n mapping of partitions to sources depending on the
parallelism. Each source will have its own watermark. During catchup,
watermark progression can become pretty fragile, e.g. in my case where
there's n partitions and parallelism is 1.

I feel like some kind of event time alignment is needed across partitions. I
may be completely off here, so I look forward to your thoughts on this!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Kafka partition alignment for event time

Posted by Stephan Ewen <se...@apache.org>.
Hi Shikar!

What you are seeing is that some streams (here the different Kafka
Partitions in one source) get merged in the source task. That happens
before watermarks are generated.
In such a case, records are out-of-order when they arrive at the
timestamp-extractor/watermark generator, and the watermark generator needs
to be implemented such that it is aware of these out-of-order records, and
uses some heuristic to generate watermarks. This is actually the general
case that one also has if timestamps are not ascending inside a single
Kafka partition.

You probably want to make use of the simple case, where timestamps are
ascending inside one Kafka partition, and use the
ascending-timestamp-extractor that auto-generates watermarks.
With Kafka, that one only works when there is 1:1 sources to partitions.


I think we can add some tooling that makes it possible to use the simple
ascending timestamp extraction also in cases where one parallel source task
has multiple Kafka partitions.
Effectively, the Kafka source has to internally generate the watermarks and
use the same "watermark union" technique as for example the join operator.

Here is the issue to track this:
https://issues.apache.org/jira/browse/FLINK-3375

Greetings,
Stephan


On Mon, Feb 8, 2016 at 9:51 PM, shikhar <sh...@schmizz.net> wrote:

> Stephan explained in that thread that we're picking the min watermark when
> doing operations that join streams from multiple sources. If we have m:n
> partition-source assignment where m>n, the source is going to end up with
> the max watermark. Having m<=n ensures that the lowest watermark is used.
>
> Re: automatic enforcement, perhaps allowing for more than 1 Kafka partition
> on a source should require opt-in, e.g. allowOversubscription()
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Kafka partition alignment for event time

Posted by shikhar <sh...@schmizz.net>.
Yes that approach seems perfect Stephan, thanks for creating the JIRA!

It is not only when resetting to smallest, I have observed uneven progress
on partitions skewing the watermark any time the source is not caught up to
the head of each partition it is handling, like when stopping for a few mins
and starting it back up (the offsets it's resuming from are approx the same
number of messages behind).



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4819.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Kafka partition alignment for event time

Posted by Stephan Ewen <se...@apache.org>.
Thanks for filling us in.

If the problem comes from the fact that the difference between partitions
becomes high sometimes (when resetting to the smallest offset),
then this could probably be solved similarly as suggested here (
https://issues.apache.org/jira/browse/FLINK-3375) by running
a watermark assigner (ascending, threashold / whatever) per partition
inside the Kafka Source.

What do you think?


On Tue, Feb 9, 2016 at 3:01 PM, shikhar <sh...@schmizz.net> wrote:

> I am assigning timestamps using a  threshold-based extractor
> <https://gist.github.com/shikhar/2d9306e2ebd8ca89728c>   -- the static
> delta
> from last timestamp is probably sufficient and the PriorityQueue for
> allowing outliers not necessary, that is something I added while figuring
> out what was going on.
>
> The timestamps across partitions don't differ that much in normal operation
> when stream processing is caught up with the head of the partitions, so the
> thresholding works well. However, during catch-up, like if I stop for a bit
> & start the job again, or there is no offset in ZK and I'm using
> 'auto.offset.reset=smallest', the source tends to emit messages with much
> larger deviations, and the timestamp extraction which is not
> partition-aware
> will start providing an incorrect watermark.
>
>
> Aljoscha Krettek wrote
> > Hi,
> > in general it should not be a problem if one parallel instance of a sink
> > is responsible for several Kafka partitions. It can become a problem if
> > the timestamps in the different partitions differ by a lot and the
> > watermark assignment logic is not able to handle this.
> >
> > How are you assigning the timestamps/watermarks in your job?
> >
> > Cheers,
> > Aljoscha
> >> On 08 Feb 2016, at 21:51, shikhar &lt;
>
> > shikhar@
>
> > &gt; wrote:
> >>
> >> Stephan explained in that thread that we're picking the min watermark
> >> when
> >> doing operations that join streams from multiple sources. If we have m:n
> >> partition-source assignment where m>n, the source is going to end up
> with
> >> the max watermark. Having m<=n ensures that the lowest watermark is
> used.
> >>
> >> Re: automatic enforcement, perhaps allowing for more than 1 Kafka
> >> partition
> >> on a source should require opt-in, e.g. allowOversubscription()
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive at Nabble.com.
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4816.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Kafka partition alignment for event time

Posted by shikhar <sh...@schmizz.net>.
I am assigning timestamps using a  threshold-based extractor
<https://gist.github.com/shikhar/2d9306e2ebd8ca89728c>   -- the static delta
from last timestamp is probably sufficient and the PriorityQueue for
allowing outliers not necessary, that is something I added while figuring
out what was going on.

The timestamps across partitions don't differ that much in normal operation
when stream processing is caught up with the head of the partitions, so the
thresholding works well. However, during catch-up, like if I stop for a bit
& start the job again, or there is no offset in ZK and I'm using
'auto.offset.reset=smallest', the source tends to emit messages with much
larger deviations, and the timestamp extraction which is not partition-aware
will start providing an incorrect watermark.


Aljoscha Krettek wrote
> Hi,
> in general it should not be a problem if one parallel instance of a sink
> is responsible for several Kafka partitions. It can become a problem if
> the timestamps in the different partitions differ by a lot and the
> watermark assignment logic is not able to handle this.
> 
> How are you assigning the timestamps/watermarks in your job?
> 
> Cheers,
> Aljoscha
>> On 08 Feb 2016, at 21:51, shikhar &lt;

> shikhar@

> &gt; wrote:
>> 
>> Stephan explained in that thread that we're picking the min watermark
>> when
>> doing operations that join streams from multiple sources. If we have m:n
>> partition-source assignment where m>n, the source is going to end up with
>> the max watermark. Having m<=n ensures that the lowest watermark is used.
>> 
>> Re: automatic enforcement, perhaps allowing for more than 1 Kafka
>> partition
>> on a source should require opt-in, e.g. allowOversubscription()
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4816.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Kafka partition alignment for event time

Posted by Erdem Agaoglu <er...@gmail.com>.
Hi Robert,

I switched to SNAPSHOT and confirm that it works. Thanks!

On Thu, Feb 25, 2016 at 10:50 AM, Robert Metzger <rm...@apache.org>
wrote:

> Hi Erdem,
>
> FLINK-3440 has been resolved. The fix is merged to master. 1.0-SNAPSHOT
> should already contain the fix and it'll be in 1.0.0 (for which I'll post a
> release candidate today) as well.
>
> On Thu, Feb 18, 2016 at 3:24 PM, Erdem Agaoglu <er...@gmail.com>
> wrote:
>
>> Thanks Stephan
>>
>> On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> You are right, the checkpoints should contain all offsets.
>>>
>>> I created a Ticket for this:
>>> https://issues.apache.org/jira/browse/FLINK-3440
>>>
>>>
>>>
>>>
>>> On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <er...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> On a related and a more exaggerated setup, our kafka-producer (flume)
>>>> seems
>>>> to send data to a single partition at a time and switches it every few
>>>> minutes. So when i run my flink datastream program for the first time,
>>>> it
>>>> starts on the *largest* offsets and shows something like this:
>>>>
>>>> . Fetched the following start offsets [FetchPartition {partition=7,
>>>> offset=15118832832}]
>>>> . Fetched the following start offsets [FetchPartition {partition=1,
>>>> offset=15203613236}]
>>>> . Fetched the following start offsets [FetchPartition {partition=2,
>>>> offset=15366811664}]
>>>> . Fetched the following start offsets [FetchPartition {partition=0,
>>>> offset=15393999709}]
>>>> . Fetched the following start offsets [FetchPartition {partition=8,
>>>> offset=15319475583}]
>>>> . Fetched the following start offsets [FetchPartition {partition=5,
>>>> offset=15482889767}]
>>>> . Fetched the following start offsets [FetchPartition {partition=6,
>>>> offset=15113885928}]
>>>> . Fetched the following start offsets [FetchPartition {partition=3,
>>>> offset=15182701991}]
>>>> . Fetched the following start offsets [FetchPartition {partition=4,
>>>> offset=15186569356}]
>>>>
>>>> For that instance flume happens to be sending data to partition-6 only,
>>>> so
>>>> other consumers sit idly. Working with default paralellism 4, only one
>>>> of
>>>> the 4 threads is able to source data and checkpointing logs reflect
>>>> that:
>>>>
>>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>>> -915623761776,
>>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>>> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
>>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>>> -915623761776,
>>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>>> -915623761776,
>>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>>
>>>> This also means checkpoint will only contain the offset for
>>>> partition-6. So
>>>> if program is stopped and restarted at a later time, it restores the
>>>> offset
>>>> for partition-6 only and other partitions are started at the largest
>>>> offset.
>>>> So it's able to process unseen data in partition-6 but not others. Say
>>>> if
>>>> flume produces data to partition-3 when flink program is stopped,
>>>> they're
>>>> lost, while the data in partition-6 is not. This generally causes
>>>> multiple
>>>> (late-)windows to be fired after restart, because we now generate
>>>> watermarks
>>>> off partition-3 which says the windows of the unseen data in
>>>> partition-6 are
>>>> already complete.
>>>>
>>>> This also has a side effect of windows not triggering unless some
>>>> rebalancing is done beforehand. Since only 1 of the 4 threads will
>>>> source
>>>> data and generate watermarks, window triggers won't get watermarks from
>>>> other 3 sources and wait long past the watermarks generated from the
>>>> single
>>>> source.
>>>>
>>>> I know producers shouldn't work like that, but consumers shouldn't
>>>> care. I
>>>> think it may also create some edge cases even if things were not as
>>>> extreme
>>>> as ours. If checkpoints could contain offsets of all of the partitions
>>>> regardless of their contents, probably storing start offsets in first
>>>> run, i
>>>> guess that would solve the problems around restarting.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive at Nabble.com.
>>>>
>>>
>>>
>>
>>
>> --
>> erdem agaoglu
>>
>
>


-- 
erdem agaoglu

Re: Kafka partition alignment for event time

Posted by Robert Metzger <rm...@apache.org>.
Hi Erdem,

FLINK-3440 has been resolved. The fix is merged to master. 1.0-SNAPSHOT
should already contain the fix and it'll be in 1.0.0 (for which I'll post a
release candidate today) as well.

On Thu, Feb 18, 2016 at 3:24 PM, Erdem Agaoglu <er...@gmail.com>
wrote:

> Thanks Stephan
>
> On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> You are right, the checkpoints should contain all offsets.
>>
>> I created a Ticket for this:
>> https://issues.apache.org/jira/browse/FLINK-3440
>>
>>
>>
>>
>> On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <er...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> On a related and a more exaggerated setup, our kafka-producer (flume)
>>> seems
>>> to send data to a single partition at a time and switches it every few
>>> minutes. So when i run my flink datastream program for the first time, it
>>> starts on the *largest* offsets and shows something like this:
>>>
>>> . Fetched the following start offsets [FetchPartition {partition=7,
>>> offset=15118832832}]
>>> . Fetched the following start offsets [FetchPartition {partition=1,
>>> offset=15203613236}]
>>> . Fetched the following start offsets [FetchPartition {partition=2,
>>> offset=15366811664}]
>>> . Fetched the following start offsets [FetchPartition {partition=0,
>>> offset=15393999709}]
>>> . Fetched the following start offsets [FetchPartition {partition=8,
>>> offset=15319475583}]
>>> . Fetched the following start offsets [FetchPartition {partition=5,
>>> offset=15482889767}]
>>> . Fetched the following start offsets [FetchPartition {partition=6,
>>> offset=15113885928}]
>>> . Fetched the following start offsets [FetchPartition {partition=3,
>>> offset=15182701991}]
>>> . Fetched the following start offsets [FetchPartition {partition=4,
>>> offset=15186569356}]
>>>
>>> For that instance flume happens to be sending data to partition-6 only,
>>> so
>>> other consumers sit idly. Working with default paralellism 4, only one of
>>> the 4 threads is able to source data and checkpointing logs reflect that:
>>>
>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>> -915623761776,
>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>> -915623761776,
>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>> -915623761776,
>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>
>>> This also means checkpoint will only contain the offset for partition-6.
>>> So
>>> if program is stopped and restarted at a later time, it restores the
>>> offset
>>> for partition-6 only and other partitions are started at the largest
>>> offset.
>>> So it's able to process unseen data in partition-6 but not others. Say if
>>> flume produces data to partition-3 when flink program is stopped, they're
>>> lost, while the data in partition-6 is not. This generally causes
>>> multiple
>>> (late-)windows to be fired after restart, because we now generate
>>> watermarks
>>> off partition-3 which says the windows of the unseen data in partition-6
>>> are
>>> already complete.
>>>
>>> This also has a side effect of windows not triggering unless some
>>> rebalancing is done beforehand. Since only 1 of the 4 threads will source
>>> data and generate watermarks, window triggers won't get watermarks from
>>> other 3 sources and wait long past the watermarks generated from the
>>> single
>>> source.
>>>
>>> I know producers shouldn't work like that, but consumers shouldn't care.
>>> I
>>> think it may also create some edge cases even if things were not as
>>> extreme
>>> as ours. If checkpoints could contain offsets of all of the partitions
>>> regardless of their contents, probably storing start offsets in first
>>> run, i
>>> guess that would solve the problems around restarting.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>
>
> --
> erdem agaoglu
>

Re: Kafka partition alignment for event time

Posted by Erdem Agaoglu <er...@gmail.com>.
Thanks Stephan

On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:

> You are right, the checkpoints should contain all offsets.
>
> I created a Ticket for this:
> https://issues.apache.org/jira/browse/FLINK-3440
>
>
>
>
> On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <er...@gmail.com> wrote:
>
>> Hi,
>>
>> On a related and a more exaggerated setup, our kafka-producer (flume)
>> seems
>> to send data to a single partition at a time and switches it every few
>> minutes. So when i run my flink datastream program for the first time, it
>> starts on the *largest* offsets and shows something like this:
>>
>> . Fetched the following start offsets [FetchPartition {partition=7,
>> offset=15118832832}]
>> . Fetched the following start offsets [FetchPartition {partition=1,
>> offset=15203613236}]
>> . Fetched the following start offsets [FetchPartition {partition=2,
>> offset=15366811664}]
>> . Fetched the following start offsets [FetchPartition {partition=0,
>> offset=15393999709}]
>> . Fetched the following start offsets [FetchPartition {partition=8,
>> offset=15319475583}]
>> . Fetched the following start offsets [FetchPartition {partition=5,
>> offset=15482889767}]
>> . Fetched the following start offsets [FetchPartition {partition=6,
>> offset=15113885928}]
>> . Fetched the following start offsets [FetchPartition {partition=3,
>> offset=15182701991}]
>> . Fetched the following start offsets [FetchPartition {partition=4,
>> offset=15186569356}]
>>
>> For that instance flume happens to be sending data to partition-6 only, so
>> other consumers sit idly. Working with default paralellism 4, only one of
>> the 4 threads is able to source data and checkpointing logs reflect that:
>>
>> Committing offsets [-915623761776, -915623761776, -915623761776,
>> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
>> -915623761776] to offset store: FLINK_ZOOKEEPER
>> Committing offsets [-915623761776, -915623761776, -915623761776,
>> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
>> -915623761776] to offset store: FLINK_ZOOKEEPER
>> Committing offsets [-915623761776, -915623761776, -915623761776,
>> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
>> -915623761776] to offset store: FLINK_ZOOKEEPER
>> Committing offsets [-915623761776, -915623761776, -915623761776,
>> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>
>> This also means checkpoint will only contain the offset for partition-6.
>> So
>> if program is stopped and restarted at a later time, it restores the
>> offset
>> for partition-6 only and other partitions are started at the largest
>> offset.
>> So it's able to process unseen data in partition-6 but not others. Say if
>> flume produces data to partition-3 when flink program is stopped, they're
>> lost, while the data in partition-6 is not. This generally causes multiple
>> (late-)windows to be fired after restart, because we now generate
>> watermarks
>> off partition-3 which says the windows of the unseen data in partition-6
>> are
>> already complete.
>>
>> This also has a side effect of windows not triggering unless some
>> rebalancing is done beforehand. Since only 1 of the 4 threads will source
>> data and generate watermarks, window triggers won't get watermarks from
>> other 3 sources and wait long past the watermarks generated from the
>> single
>> source.
>>
>> I know producers shouldn't work like that, but consumers shouldn't care. I
>> think it may also create some edge cases even if things were not as
>> extreme
>> as ours. If checkpoints could contain offsets of all of the partitions
>> regardless of their contents, probably storing start offsets in first
>> run, i
>> guess that would solve the problems around restarting.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


-- 
erdem agaoglu

Re: Kafka partition alignment for event time

Posted by Stephan Ewen <se...@apache.org>.
You are right, the checkpoints should contain all offsets.

I created a Ticket for this:
https://issues.apache.org/jira/browse/FLINK-3440




On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <er...@gmail.com> wrote:

> Hi,
>
> On a related and a more exaggerated setup, our kafka-producer (flume) seems
> to send data to a single partition at a time and switches it every few
> minutes. So when i run my flink datastream program for the first time, it
> starts on the *largest* offsets and shows something like this:
>
> . Fetched the following start offsets [FetchPartition {partition=7,
> offset=15118832832}]
> . Fetched the following start offsets [FetchPartition {partition=1,
> offset=15203613236}]
> . Fetched the following start offsets [FetchPartition {partition=2,
> offset=15366811664}]
> . Fetched the following start offsets [FetchPartition {partition=0,
> offset=15393999709}]
> . Fetched the following start offsets [FetchPartition {partition=8,
> offset=15319475583}]
> . Fetched the following start offsets [FetchPartition {partition=5,
> offset=15482889767}]
> . Fetched the following start offsets [FetchPartition {partition=6,
> offset=15113885928}]
> . Fetched the following start offsets [FetchPartition {partition=3,
> offset=15182701991}]
> . Fetched the following start offsets [FetchPartition {partition=4,
> offset=15186569356}]
>
> For that instance flume happens to be sending data to partition-6 only, so
> other consumers sit idly. Working with default paralellism 4, only one of
> the 4 threads is able to source data and checkpointing logs reflect that:
>
> Committing offsets [-915623761776, -915623761776, -915623761776,
> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
> -915623761776] to offset store: FLINK_ZOOKEEPER
> Committing offsets [-915623761776, -915623761776, -915623761776,
> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
> -915623761776] to offset store: FLINK_ZOOKEEPER
> Committing offsets [-915623761776, -915623761776, -915623761776,
> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
> -915623761776] to offset store: FLINK_ZOOKEEPER
> Committing offsets [-915623761776, -915623761776, -915623761776,
> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
> -915623761776] to offset store: FLINK_ZOOKEEPER
>
> This also means checkpoint will only contain the offset for partition-6. So
> if program is stopped and restarted at a later time, it restores the offset
> for partition-6 only and other partitions are started at the largest
> offset.
> So it's able to process unseen data in partition-6 but not others. Say if
> flume produces data to partition-3 when flink program is stopped, they're
> lost, while the data in partition-6 is not. This generally causes multiple
> (late-)windows to be fired after restart, because we now generate
> watermarks
> off partition-3 which says the windows of the unseen data in partition-6
> are
> already complete.
>
> This also has a side effect of windows not triggering unless some
> rebalancing is done beforehand. Since only 1 of the 4 threads will source
> data and generate watermarks, window triggers won't get watermarks from
> other 3 sources and wait long past the watermarks generated from the single
> source.
>
> I know producers shouldn't work like that, but consumers shouldn't care. I
> think it may also create some edge cases even if things were not as extreme
> as ours. If checkpoints could contain offsets of all of the partitions
> regardless of their contents, probably storing start offsets in first run,
> i
> guess that would solve the problems around restarting.
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Kafka partition alignment for event time

Posted by agaoglu <er...@gmail.com>.
Hi,

On a related and a more exaggerated setup, our kafka-producer (flume) seems
to send data to a single partition at a time and switches it every few
minutes. So when i run my flink datastream program for the first time, it
starts on the *largest* offsets and shows something like this:

. Fetched the following start offsets [FetchPartition {partition=7,
offset=15118832832}]
. Fetched the following start offsets [FetchPartition {partition=1,
offset=15203613236}]
. Fetched the following start offsets [FetchPartition {partition=2,
offset=15366811664}]
. Fetched the following start offsets [FetchPartition {partition=0,
offset=15393999709}]
. Fetched the following start offsets [FetchPartition {partition=8,
offset=15319475583}]
. Fetched the following start offsets [FetchPartition {partition=5,
offset=15482889767}]
. Fetched the following start offsets [FetchPartition {partition=6,
offset=15113885928}]
. Fetched the following start offsets [FetchPartition {partition=3,
offset=15182701991}]
. Fetched the following start offsets [FetchPartition {partition=4,
offset=15186569356}]

For that instance flume happens to be sending data to partition-6 only, so
other consumers sit idly. Working with default paralellism 4, only one of
the 4 threads is able to source data and checkpointing logs reflect that:

Committing offsets [-915623761776, -915623761776, -915623761776,
-915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
-915623761776] to offset store: FLINK_ZOOKEEPER
Committing offsets [-915623761776, -915623761776, -915623761776,
-915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
-915623761776] to offset store: FLINK_ZOOKEEPER
Committing offsets [-915623761776, -915623761776, -915623761776,
-915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
-915623761776] to offset store: FLINK_ZOOKEEPER
Committing offsets [-915623761776, -915623761776, -915623761776,
-915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
-915623761776] to offset store: FLINK_ZOOKEEPER

This also means checkpoint will only contain the offset for partition-6. So
if program is stopped and restarted at a later time, it restores the offset
for partition-6 only and other partitions are started at the largest offset.
So it's able to process unseen data in partition-6 but not others. Say if
flume produces data to partition-3 when flink program is stopped, they're
lost, while the data in partition-6 is not. This generally causes multiple
(late-)windows to be fired after restart, because we now generate watermarks
off partition-3 which says the windows of the unseen data in partition-6 are
already complete.

This also has a side effect of windows not triggering unless some
rebalancing is done beforehand. Since only 1 of the 4 threads will source
data and generate watermarks, window triggers won't get watermarks from
other 3 sources and wait long past the watermarks generated from the single
source.

I know producers shouldn't work like that, but consumers shouldn't care. I
think it may also create some edge cases even if things were not as extreme
as ours. If checkpoints could contain offsets of all of the partitions
regardless of their contents, probably storing start offsets in first run, i
guess that would solve the problems around restarting.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Kafka partition alignment for event time

Posted by shikhar <sh...@schmizz.net>.
Hi Fabian,

Sorry, I should have been clearer. What I meant (or now know!) by duplicate
emits is that since the watermark is progressing more rapidly than the state
of the offsets on some partitions due to the source multiplexing more than 1
partition, when messages from the lagging partitions are passed on to
further operators specifically time-based windowing they get emitted
immediately, resulting in duplicate windows
(https://issues.apache.org/jira/browse/FLINK-2870).



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4817.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Kafka partition alignment for event time

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

where did you observe the duplicates, within Flink or in Kafka?
Please be aware that the Flink Kafka Producer does not provide exactly-once
consistency. This is not easily possible because Kafka does not support
transactional writes yet.

Flink's exactly-once guarantees are only valid within the Flink DataStream
program and for some sinks such as the RollingFileSink.

Cheers, Fabian

2016-02-09 10:21 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> Hi,
> in general it should not be a problem if one parallel instance of a sink
> is responsible for several Kafka partitions. It can become a problem if the
> timestamps in the different partitions differ by a lot and the watermark
> assignment logic is not able to handle this.
>
> How are you assigning the timestamps/watermarks in your job?
>
> Cheers,
> Aljoscha
> > On 08 Feb 2016, at 21:51, shikhar <sh...@schmizz.net> wrote:
> >
> > Stephan explained in that thread that we're picking the min watermark
> when
> > doing operations that join streams from multiple sources. If we have m:n
> > partition-source assignment where m>n, the source is going to end up with
> > the max watermark. Having m<=n ensures that the lowest watermark is used.
> >
> > Re: automatic enforcement, perhaps allowing for more than 1 Kafka
> partition
> > on a source should require opt-in, e.g. allowOversubscription()
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>

Re: Kafka partition alignment for event time

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
in general it should not be a problem if one parallel instance of a sink is responsible for several Kafka partitions. It can become a problem if the timestamps in the different partitions differ by a lot and the watermark assignment logic is not able to handle this.

How are you assigning the timestamps/watermarks in your job?

Cheers,
Aljoscha
> On 08 Feb 2016, at 21:51, shikhar <sh...@schmizz.net> wrote:
> 
> Stephan explained in that thread that we're picking the min watermark when
> doing operations that join streams from multiple sources. If we have m:n
> partition-source assignment where m>n, the source is going to end up with
> the max watermark. Having m<=n ensures that the lowest watermark is used.
> 
> Re: automatic enforcement, perhaps allowing for more than 1 Kafka partition
> on a source should require opt-in, e.g. allowOversubscription()
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: Kafka partition alignment for event time

Posted by shikhar <sh...@schmizz.net>.
Stephan explained in that thread that we're picking the min watermark when
doing operations that join streams from multiple sources. If we have m:n
partition-source assignment where m>n, the source is going to end up with
the max watermark. Having m<=n ensures that the lowest watermark is used.

Re: automatic enforcement, perhaps allowing for more than 1 Kafka partition
on a source should require opt-in, e.g. allowOversubscription()



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Kafka partition alignment for event time

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
what do you mean by this? I think it should also work when setting parallelism to 1. If not, then there is either a problem with Flink or maybe something in the Data.

-Aljoscha
> On 08 Feb 2016, at 21:43, shikhar <sh...@schmizz.net> wrote:
> 
> Things make more sense after coming across
> http://mail-archives.apache.org/mod_mbox/flink-user/201512.mbox/%3CCANC1h_vVUT3BkFFck5wJA2ju_sSenxmE=Fiizr=ds6tBasYTJQ@mail.gmail.com%3E
> 
> I need to ensure the parallelism is at least the number of partitions. This
> seems like a gotcha that could be better documented or automatically
> enforced.
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4786.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: Kafka partition alignment for event time

Posted by shikhar <sh...@schmizz.net>.
Things make more sense after coming across
http://mail-archives.apache.org/mod_mbox/flink-user/201512.mbox/%3CCANC1h_vVUT3BkFFck5wJA2ju_sSenxmE=Fiizr=ds6tBasYTJQ@mail.gmail.com%3E

I need to ensure the parallelism is at least the number of partitions. This
seems like a gotcha that could be better documented or automatically
enforced.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4786.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.