You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2021/12/19 15:46:13 UTC

Sending watermarks into Kafka

Hi,

About a year ago I spoke at the Flink Forward conference (
https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development
problems regarding streaming applications and handling the lack of events
in a stream.
Something I spoke about towards the end of this talk was the idea to ship
the watermarks of a Flink topology into the intermediate transport between
applications so you wouldn't need to recreate them.

At that time it was just an idea, today I'm actually trying to build that
and see if this idea is actually possible.

So the class of applications I work on usually do a keyBy on something like
a SessionId, SensorId or IP address.
In low traffic scenarios this means that in Kafka some partitions are
completely idle which makes Windows/GroupBy type operations impossible (in
my talk I explain it a lot better).

I have a test setup right now to play around with this and I'm running into
a bit of a conceptual hurdle for which I'm looking for help.

My goal is to ship the watermarks from within a topology into Kafka and
then let a follow up application extract those watermarks again and simply
continue.
The new SinkWriter interface has a void writeWatermark(Watermark
watermark) method
that seems intended for this kind of thing.
The basic operations like writing a watermark into Kafka, reading it again
and then recreating the watermark again works in my test setup (very messy
code but it works).

My hurdle has to do with the combination of
- different parallelism numbers between Flink and Kafka (how do I ship 2
watermarks into 3 partitions)
- the fact that if you do a keyBy (both in Flink and Kafka) there is a
likely mismatch between the Flink 'partition' and the Kafka `partition`.
- processing speed differences between various threads (like session "A"
needs more CPU cycles/time/processing than session "B") will lead to
skewing of the progression between them.
- watermarks in separate threads in a single Flink topology are not
synchronized (they cannot and should not be).

Has anyone any pointers on possible ways to handle this?

Right now my only idea is to ship the watermark into all partitions (as
they do not have a key!) and let the consuming application determine the
"real watermark" based on the mix of watermarks coming in from the upstream
threads.

All suggestions and ideas are appreciated.

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Sending watermarks into Kafka

Posted by Niels Basjes <Ni...@basjes.nl>.
I just realized I did not mention a key concept in my idea:
All watermarks are sent into all Kafka partitions so every consumer gets
all watermarks from all upstream producing tasks.

On Wed, Dec 22, 2021 at 11:33 AM Niels Basjes <Ni...@basjes.nl> wrote:

> > Perhaps for this an explicit config is needed per source id (pattern?):
> > - Important producer: expire after 24 hours.
> > - Yet another IOT sensor: expire after 1 minute.
> > - and a default.
>
> Do note that this actually does something I'm not a fan of: Introduce a
> "Processing time" conclusion (the timeout of the watermarks) in an "Event
> time" stream.
> I consider mixing these two in general something to be avoided.
>
> For the situation a producer is "signing of forever" there is the option
> of it sending out a final closing watermark indicating "MAX_LONG".
> But for the "the sensor died" scenario I do not yet see a better way to do
> this.
>
> Niels
>
>
>
> On Wed, Dec 22, 2021 at 11:22 AM Niels Basjes <Ni...@basjes.nl> wrote:
>
>>
>> On Tue, Dec 21, 2021 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> Your high level layout make sense. However, I think there are a few
>>> problems doing it with Flink:
>>>
>>> (1) How to encode the watermark? It could break downstream consumers
>>> that don't know what to do with them (eg, crash on deserialization)?
>>> There is no guarantee that only a downstream Flink job consumes the data
>>> (nor that the downstream Flink was upgraded to understand those input
>>> watermarks).
>>>
>>
>> What I'm doing in my experiments right now is to have created a
>> WatermarkSerde interface that is able to serialize a Watermark into my own
>> format.
>> This WatermarkSerde would then be implemented by the application builder
>> and injected in both the Sink and Source implementation of whatever
>> transport I like to use (Kafka, ...) .
>> The WatermarkSerde would also need something to determine if the provided
>> element is a watermark or a normal element.
>>
>> This allows me to use any format I like in my application and to make the
>> watermark be a valid yet slightly strange record in my stream.
>> So at the receiving end the record can simply deserialize and then
>> determine if it is a watermark or not.
>>
>> As a consequence all downstream implementations can simply be instructed
>> on how they can see if the event is really an event or a Watermark.
>>
>> A default/reference implementation that can be used where a textual
>> format is expected (like json or xml) is also an option.
>>
>> Do note that at the application level I consider the presence of these
>> special elements to be a part of the definition of the streaming interface
>> of this (set of) applications.
>> Regardless of the "where and how" of the implementation: the streaming
>> interface now "contains watermarks" in a defined way.
>>
>> Using a control message, the downstream KafkaConsumer would
>>> "filter" control messages / watermarks automatically, and user's would
>>> opt-in explicitly thus providing a safe and backward compatible upgrade
>>> path.
>>>
>>
>> Yes, that would work.
>>
>> (2) About all the metadata: it will be hard for Flink to track all those
>>> things, but it would be simpler to push it into the storage layer IMHO.
>>> For example, if Flink does dynamically scaling, it adds a new producer
>>> to the group and Kafka takes care of the rest. Thus, Flink only needs to
>>> provide the actual watermark timestamp.
>>>
>>
>> Yes, but still there is a need for an administration that tracks all
>> producers and their watermarks.
>> As far as I can tell right now the complexity will be roughly the same in
>> all directions.
>>
>>
>>>      On particular problem is error handling scenario: what happens if a
>>> producer fails and one downstream watermarks is missing?
>>
>>
>> In my mind the watermarks between all producer tasks are not synchronized
>> as they are initially created in an independent way on different machines.
>> So the sketch I have in mind right now to handle this would
>> - keep a list of all "last seen watermark timestamps" per application per
>> original taskId per input partition (this last one plays a role if you read
>> from multiple partitions with a single instance).
>> - when a new watermark arrives,
>>    - persist it if later than what we have so far.
>>    - Only if it is newer than ALL others we have output a watermark into
>> the application stream.
>>    - only outputting a watermark if we have a valid 'last watermark' for
>> all producing tasks.
>> and
>> - taking into account how many producer tasks there are: Only output if
>> we have at least one for each one.
>> - essentially doing a reset if the producer parallelism changes and
>> change the expectations on how many parallels we need.
>>    - This is a tricky one as we will get a mix of old and new watermarks
>> during this transition.
>>
>> This would be done in all receiving Source instances separately as they
>> are generally on different machines.
>>
>> So in this failure scenario all Sources would wait with their watermarks
>> until they have received the watermarks from the recovered instance of this
>> producer.
>>
>> What happens if
>>> Flink thinks a worker is dead and replaces it with a different one, but
>>> the worker is actually a zombie and might still write watermarks into
>>> the topic?
>>
>>
>> Yes, that would be a serious problem for all implementation directions.
>> I don't know yet.
>>
>> Flink cannot fence off the zombie. -- Pushing it into Kafka
>>> allows to handle those cases much easier IMHO.
>>>
>>
>> Good point.
>>
>>
>>> (3) About ordering: Kafka provides strict ordering guarantees per
>>> partitions. Thus, there is no problem here. Of course, if there are
>>> multiple producers writing into the same partition, you get interleaved
>>> writes (that's why you need to know how many producer you got, to be
>>> able to reason about it downstream).
>>>
>>
>> That is why my sketch keeps watermarks for each task for all producing
>> applications and essentially determines a new watermark from the total set.
>>
>> The big problem there (also for the Kafka route) would be handling
>> producers leaving.
>> In my sketch "new producers" is easy to handle; just add the new
>> expectation to wait for.
>>
>> Leaving producers is not easy to handle as I see no way to determine
>> the distinction between
>> - "leaving, continue without it"
>> and
>> - "down, wait for it to be back a little while later".
>>
>> Perhaps for this an explicit config is needed per source id (pattern?):
>> - Important producer: expire after 24 hours.
>> - Yet another IOT sensor: expire after 1 minute.
>> - and a default.
>>
>> Hope this helps.
>>>
>>
>> Yes it does.
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Sending watermarks into Kafka

Posted by Niels Basjes <Ni...@basjes.nl>.
> Perhaps for this an explicit config is needed per source id (pattern?):
> - Important producer: expire after 24 hours.
> - Yet another IOT sensor: expire after 1 minute.
> - and a default.

Do note that this actually does something I'm not a fan of: Introduce a
"Processing time" conclusion (the timeout of the watermarks) in an "Event
time" stream.
I consider mixing these two in general something to be avoided.

For the situation a producer is "signing of forever" there is the option of
it sending out a final closing watermark indicating "MAX_LONG".
But for the "the sensor died" scenario I do not yet see a better way to do
this.

Niels



On Wed, Dec 22, 2021 at 11:22 AM Niels Basjes <Ni...@basjes.nl> wrote:

>
> On Tue, Dec 21, 2021 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> Your high level layout make sense. However, I think there are a few
>> problems doing it with Flink:
>>
>> (1) How to encode the watermark? It could break downstream consumers
>> that don't know what to do with them (eg, crash on deserialization)?
>> There is no guarantee that only a downstream Flink job consumes the data
>> (nor that the downstream Flink was upgraded to understand those input
>> watermarks).
>>
>
> What I'm doing in my experiments right now is to have created a
> WatermarkSerde interface that is able to serialize a Watermark into my own
> format.
> This WatermarkSerde would then be implemented by the application builder
> and injected in both the Sink and Source implementation of whatever
> transport I like to use (Kafka, ...) .
> The WatermarkSerde would also need something to determine if the provided
> element is a watermark or a normal element.
>
> This allows me to use any format I like in my application and to make the
> watermark be a valid yet slightly strange record in my stream.
> So at the receiving end the record can simply deserialize and then
> determine if it is a watermark or not.
>
> As a consequence all downstream implementations can simply be instructed
> on how they can see if the event is really an event or a Watermark.
>
> A default/reference implementation that can be used where a textual format
> is expected (like json or xml) is also an option.
>
> Do note that at the application level I consider the presence of these
> special elements to be a part of the definition of the streaming interface
> of this (set of) applications.
> Regardless of the "where and how" of the implementation: the streaming
> interface now "contains watermarks" in a defined way.
>
> Using a control message, the downstream KafkaConsumer would
>> "filter" control messages / watermarks automatically, and user's would
>> opt-in explicitly thus providing a safe and backward compatible upgrade
>> path.
>>
>
> Yes, that would work.
>
> (2) About all the metadata: it will be hard for Flink to track all those
>> things, but it would be simpler to push it into the storage layer IMHO.
>> For example, if Flink does dynamically scaling, it adds a new producer
>> to the group and Kafka takes care of the rest. Thus, Flink only needs to
>> provide the actual watermark timestamp.
>>
>
> Yes, but still there is a need for an administration that tracks all
> producers and their watermarks.
> As far as I can tell right now the complexity will be roughly the same in
> all directions.
>
>
>>      On particular problem is error handling scenario: what happens if a
>> producer fails and one downstream watermarks is missing?
>
>
> In my mind the watermarks between all producer tasks are not synchronized
> as they are initially created in an independent way on different machines.
> So the sketch I have in mind right now to handle this would
> - keep a list of all "last seen watermark timestamps" per application per
> original taskId per input partition (this last one plays a role if you read
> from multiple partitions with a single instance).
> - when a new watermark arrives,
>    - persist it if later than what we have so far.
>    - Only if it is newer than ALL others we have output a watermark into
> the application stream.
>    - only outputting a watermark if we have a valid 'last watermark' for
> all producing tasks.
> and
> - taking into account how many producer tasks there are: Only output if we
> have at least one for each one.
> - essentially doing a reset if the producer parallelism changes and
> change the expectations on how many parallels we need.
>    - This is a tricky one as we will get a mix of old and new watermarks
> during this transition.
>
> This would be done in all receiving Source instances separately as they
> are generally on different machines.
>
> So in this failure scenario all Sources would wait with their watermarks
> until they have received the watermarks from the recovered instance of this
> producer.
>
> What happens if
>> Flink thinks a worker is dead and replaces it with a different one, but
>> the worker is actually a zombie and might still write watermarks into
>> the topic?
>
>
> Yes, that would be a serious problem for all implementation directions.
> I don't know yet.
>
> Flink cannot fence off the zombie. -- Pushing it into Kafka
>> allows to handle those cases much easier IMHO.
>>
>
> Good point.
>
>
>> (3) About ordering: Kafka provides strict ordering guarantees per
>> partitions. Thus, there is no problem here. Of course, if there are
>> multiple producers writing into the same partition, you get interleaved
>> writes (that's why you need to know how many producer you got, to be
>> able to reason about it downstream).
>>
>
> That is why my sketch keeps watermarks for each task for all producing
> applications and essentially determines a new watermark from the total set.
>
> The big problem there (also for the Kafka route) would be handling
> producers leaving.
> In my sketch "new producers" is easy to handle; just add the new
> expectation to wait for.
>
> Leaving producers is not easy to handle as I see no way to determine
> the distinction between
> - "leaving, continue without it"
> and
> - "down, wait for it to be back a little while later".
>
> Perhaps for this an explicit config is needed per source id (pattern?):
> - Important producer: expire after 24 hours.
> - Yet another IOT sensor: expire after 1 minute.
> - and a default.
>
> Hope this helps.
>>
>
> Yes it does.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Sending watermarks into Kafka

Posted by Niels Basjes <Ni...@basjes.nl>.
On Tue, Dec 21, 2021 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:

> Your high level layout make sense. However, I think there are a few
> problems doing it with Flink:
>
> (1) How to encode the watermark? It could break downstream consumers
> that don't know what to do with them (eg, crash on deserialization)?
> There is no guarantee that only a downstream Flink job consumes the data
> (nor that the downstream Flink was upgraded to understand those input
> watermarks).
>

What I'm doing in my experiments right now is to have created a
WatermarkSerde interface that is able to serialize a Watermark into my own
format.
This WatermarkSerde would then be implemented by the application builder
and injected in both the Sink and Source implementation of whatever
transport I like to use (Kafka, ...) .
The WatermarkSerde would also need something to determine if the provided
element is a watermark or a normal element.

This allows me to use any format I like in my application and to make the
watermark be a valid yet slightly strange record in my stream.
So at the receiving end the record can simply deserialize and then
determine if it is a watermark or not.

As a consequence all downstream implementations can simply be instructed on
how they can see if the event is really an event or a Watermark.

A default/reference implementation that can be used where a textual format
is expected (like json or xml) is also an option.

Do note that at the application level I consider the presence of these
special elements to be a part of the definition of the streaming interface
of this (set of) applications.
Regardless of the "where and how" of the implementation: the streaming
interface now "contains watermarks" in a defined way.

Using a control message, the downstream KafkaConsumer would
> "filter" control messages / watermarks automatically, and user's would
> opt-in explicitly thus providing a safe and backward compatible upgrade
> path.
>

Yes, that would work.

(2) About all the metadata: it will be hard for Flink to track all those
> things, but it would be simpler to push it into the storage layer IMHO.
> For example, if Flink does dynamically scaling, it adds a new producer
> to the group and Kafka takes care of the rest. Thus, Flink only needs to
> provide the actual watermark timestamp.
>

Yes, but still there is a need for an administration that tracks all
producers and their watermarks.
As far as I can tell right now the complexity will be roughly the same in
all directions.


>      On particular problem is error handling scenario: what happens if a
> producer fails and one downstream watermarks is missing?


In my mind the watermarks between all producer tasks are not synchronized
as they are initially created in an independent way on different machines.
So the sketch I have in mind right now to handle this would
- keep a list of all "last seen watermark timestamps" per application per
original taskId per input partition (this last one plays a role if you read
from multiple partitions with a single instance).
- when a new watermark arrives,
   - persist it if later than what we have so far.
   - Only if it is newer than ALL others we have output a watermark into
the application stream.
   - only outputting a watermark if we have a valid 'last watermark' for
all producing tasks.
and
- taking into account how many producer tasks there are: Only output if we
have at least one for each one.
- essentially doing a reset if the producer parallelism changes and
change the expectations on how many parallels we need.
   - This is a tricky one as we will get a mix of old and new watermarks
during this transition.

This would be done in all receiving Source instances separately as they are
generally on different machines.

So in this failure scenario all Sources would wait with their watermarks
until they have received the watermarks from the recovered instance of this
producer.

What happens if
> Flink thinks a worker is dead and replaces it with a different one, but
> the worker is actually a zombie and might still write watermarks into
> the topic?


Yes, that would be a serious problem for all implementation directions.
I don't know yet.

Flink cannot fence off the zombie. -- Pushing it into Kafka
> allows to handle those cases much easier IMHO.
>

Good point.


> (3) About ordering: Kafka provides strict ordering guarantees per
> partitions. Thus, there is no problem here. Of course, if there are
> multiple producers writing into the same partition, you get interleaved
> writes (that's why you need to know how many producer you got, to be
> able to reason about it downstream).
>

That is why my sketch keeps watermarks for each task for all producing
applications and essentially determines a new watermark from the total set.

The big problem there (also for the Kafka route) would be handling
producers leaving.
In my sketch "new producers" is easy to handle; just add the new
expectation to wait for.

Leaving producers is not easy to handle as I see no way to determine
the distinction between
- "leaving, continue without it"
and
- "down, wait for it to be back a little while later".

Perhaps for this an explicit config is needed per source id (pattern?):
- Important producer: expire after 24 hours.
- Yet another IOT sensor: expire after 1 minute.
- and a default.

Hope this helps.
>

Yes it does.

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Sending watermarks into Kafka

Posted by "Matthias J. Sax" <mj...@apache.org>.
Your high level layout make sense. However, I think there are a few 
problems doing it with Flink:

(1) How to encode the watermark? It could break downstream consumers 
that don't know what to do with them (eg, crash on deserialization)? 
There is no guarantee that only a downstream Flink job consumes the data 
(nor that the downstream Flink was upgraded to understand those input 
watermarks).

     Using a control message, the downstream KafkaConsumer would 
"filter" control messages / watermarks automatically, and user's would 
opt-in explicitly thus providing a safe and backward compatible upgrade 
path.


(2) About all the metadata: it will be hard for Flink to track all those 
things, but it would be simpler to push it into the storage layer IMHO. 
For example, if Flink does dynamically scaling, it adds a new producer 
to the group and Kafka takes care of the rest. Thus, Flink only needs to 
provide the actual watermark timestamp.

     On particular problem is error handling scenario: what happens if a 
producer fails and one downstream watermarks is missing? What happens if 
Flink thinks a worker is dead and replaces it with a different one, but 
the worker is actually a zombie and might still write watermarks into 
the topic? Flink cannot fence off the zombie. -- Pushing it into Kafka 
allows to handle those cases much easier IMHO.


(3) About ordering: Kafka provides strict ordering guarantees per 
partitions. Thus, there is no problem here. Of course, if there are 
multiple producers writing into the same partition, you get interleaved 
writes (that's why you need to know how many producer you got, to be 
able to reason about it downstream).


Hope this helps.

-Matthias




On 12/21/21 5:13 AM, Niels Basjes wrote:
> Hi,
> 
> Like I said I've only just started thinking about how this can be
> implemented (I'm currently still lacking a lot of knowledge).
> So at this point I do not yet see why solving this in the transport (like
> Kafka) is easier than solving it in the processing engine (like Flink).
> In the normal scenarios we have today all watermarks are (re)created in the
> processing engine so instinctively I would expect that to be the
> "right place".
> 
> Also as far as I can see right now in order to make this happen the
> watermarks must all be annotated with things like the applicationId (to
> handle multiple producers), the timestamp (duh), the taskId and the total
> number of tasks in the producing system: So the producers or the broker
> must attach this information to the watermarks.
> It should also be able to handle dynamic scaling of producing applications
> and handling the entering and leaving of producers into a topic is also a
> thing to consider.
> [Reading this back; is this the reason for it to be easier in the
> transport?]
> 
> I do realize that even if this is implemented in the processing engine some
> constraints may be needed to allow this to work: Like having some kind of
> ordering guarantees per partition in a topic.
> 
> Do you guys know of any article/blog/paper/mail discussion/... that
> describes/discusses this?
> 
> Niels
> 
> On Mon, Dec 20, 2021 at 4:35 PM Matthias J. Sax <mj...@mailbox.org.invalid>
> wrote:
> 
>> I think this problem should be tackled inside Kafka, not Flink.
>>
>> Kafka already has internal control messages to write transaction
>> markers. Those could be extended to carry watermark information. It
>> would be best to generalize those as "user control messages" and
>> watermarks could just be one application.
>>
>> In addition, we might need something link a "producer group" to track
>> how many producers are writing into a partition: this would allow to
>> inform downstream consumer how many different watermarks they need to
>> track.
>>
>> It's not an easy problem to solve, but without integrating with the
>> storage layer, but trying to solve it at the processing layer, it's even
>> harder.
>>
>> -Matthias
>>
>> On 12/20/21 01:57, Niels Basjes wrote:
>>> I'm reading the Pulsar PIP and noticed another thing to take into
>> account:
>>> multiple applications (with each a different parallelism) that all write
>>> into the same topic.
>>>
>>> On Mon, 20 Dec 2021, 10:45 Niels Basjes, <Ni...@basjes.nl> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> This morning I also realized what you call an 'effective watermark' is
>>>> indeed what is needed.
>>>> I'm going to read up on what Pulsar has planned.
>>>>
>>>> What I realized is that the consuming application must be aware of the
>>>> parallelism of the producing application, which is independent of the
>>>> partitions in the intermediate transport.
>>>>
>>>> Assume I produce in parallel 2 and have 5 kafka partition which I then
>>>> read in parallel 3; then in the consuming (parallel 3) application I
>> must
>>>> wait for watermarks from each original input before I can continue:
>> which
>>>> is 2
>>>> Also we must assume that those watermarks are created at different
>>>> timestamps.
>>>> So my current assessment is that the watermark records must include at
>>>> least the timestamp, the number of the thread for this watermark and the
>>>> total number of threads .
>>>>
>>>> Niels
>>>>
>>>>
>>>> On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Niels,
>>>>>
>>>>> if you have multiple inputs going into a single Kafka partition then
>> you
>>>>> have to calculate the effective watermark by looking at the min
>> watermark
>>>>> from all inputs. You could insert a Flink operator that takes care of
>> it
>>>>> and then writes to a set of partitions in 1:n relationship.
>> Alternatively,
>>>>> you could take a look at Pulsar that wants to support this
>> functionality
>>>>> out of the box [1].
>>>>>
>>>>> [1] https://github.com/apache/pulsar/issues/12267
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <Ni...@basjes.nl> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> About a year ago I spoke at the Flink Forward conference (
>>>>>> https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling
>>>>> development
>>>>>> problems regarding streaming applications and handling the lack of
>>>>> events
>>>>>> in a stream.
>>>>>> Something I spoke about towards the end of this talk was the idea to
>>>>> ship
>>>>>> the watermarks of a Flink topology into the intermediate transport
>>>>> between
>>>>>> applications so you wouldn't need to recreate them.
>>>>>>
>>>>>> At that time it was just an idea, today I'm actually trying to build
>>>>> that
>>>>>> and see if this idea is actually possible.
>>>>>>
>>>>>> So the class of applications I work on usually do a keyBy on something
>>>>> like
>>>>>> a SessionId, SensorId or IP address.
>>>>>> In low traffic scenarios this means that in Kafka some partitions are
>>>>>> completely idle which makes Windows/GroupBy type operations impossible
>>>>> (in
>>>>>> my talk I explain it a lot better).
>>>>>>
>>>>>> I have a test setup right now to play around with this and I'm running
>>>>> into
>>>>>> a bit of a conceptual hurdle for which I'm looking for help.
>>>>>>
>>>>>> My goal is to ship the watermarks from within a topology into Kafka
>> and
>>>>>> then let a follow up application extract those watermarks again and
>>>>> simply
>>>>>> continue.
>>>>>> The new SinkWriter interface has a void writeWatermark(Watermark
>>>>>> watermark) method
>>>>>> that seems intended for this kind of thing.
>>>>>> The basic operations like writing a watermark into Kafka, reading it
>>>>> again
>>>>>> and then recreating the watermark again works in my test setup (very
>>>>> messy
>>>>>> code but it works).
>>>>>>
>>>>>> My hurdle has to do with the combination of
>>>>>> - different parallelism numbers between Flink and Kafka (how do I
>> ship 2
>>>>>> watermarks into 3 partitions)
>>>>>> - the fact that if you do a keyBy (both in Flink and Kafka) there is a
>>>>>> likely mismatch between the Flink 'partition' and the Kafka
>> `partition`.
>>>>>> - processing speed differences between various threads (like session
>> "A"
>>>>>> needs more CPU cycles/time/processing than session "B") will lead to
>>>>>> skewing of the progression between them.
>>>>>> - watermarks in separate threads in a single Flink topology are not
>>>>>> synchronized (they cannot and should not be).
>>>>>>
>>>>>> Has anyone any pointers on possible ways to handle this?
>>>>>>
>>>>>> Right now my only idea is to ship the watermark into all partitions
>> (as
>>>>>> they do not have a key!) and let the consuming application determine
>> the
>>>>>> "real watermark" based on the mix of watermarks coming in from the
>>>>> upstream
>>>>>> threads.
>>>>>>
>>>>>> All suggestions and ideas are appreciated.
>>>>>>
>>>>>> --
>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>
>>>>>> Niels Basjes
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>
> 
> 

Re: Sending watermarks into Kafka

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi,

Like I said I've only just started thinking about how this can be
implemented (I'm currently still lacking a lot of knowledge).
So at this point I do not yet see why solving this in the transport (like
Kafka) is easier than solving it in the processing engine (like Flink).
In the normal scenarios we have today all watermarks are (re)created in the
processing engine so instinctively I would expect that to be the
"right place".

Also as far as I can see right now in order to make this happen the
watermarks must all be annotated with things like the applicationId (to
handle multiple producers), the timestamp (duh), the taskId and the total
number of tasks in the producing system: So the producers or the broker
must attach this information to the watermarks.
It should also be able to handle dynamic scaling of producing applications
and handling the entering and leaving of producers into a topic is also a
thing to consider.
[Reading this back; is this the reason for it to be easier in the
transport?]

I do realize that even if this is implemented in the processing engine some
constraints may be needed to allow this to work: Like having some kind of
ordering guarantees per partition in a topic.

Do you guys know of any article/blog/paper/mail discussion/... that
describes/discusses this?

Niels

On Mon, Dec 20, 2021 at 4:35 PM Matthias J. Sax <mj...@mailbox.org.invalid>
wrote:

> I think this problem should be tackled inside Kafka, not Flink.
>
> Kafka already has internal control messages to write transaction
> markers. Those could be extended to carry watermark information. It
> would be best to generalize those as "user control messages" and
> watermarks could just be one application.
>
> In addition, we might need something link a "producer group" to track
> how many producers are writing into a partition: this would allow to
> inform downstream consumer how many different watermarks they need to
> track.
>
> It's not an easy problem to solve, but without integrating with the
> storage layer, but trying to solve it at the processing layer, it's even
> harder.
>
> -Matthias
>
> On 12/20/21 01:57, Niels Basjes wrote:
> > I'm reading the Pulsar PIP and noticed another thing to take into
> account:
> > multiple applications (with each a different parallelism) that all write
> > into the same topic.
> >
> > On Mon, 20 Dec 2021, 10:45 Niels Basjes, <Ni...@basjes.nl> wrote:
> >
> >> Hi Till,
> >>
> >> This morning I also realized what you call an 'effective watermark' is
> >> indeed what is needed.
> >> I'm going to read up on what Pulsar has planned.
> >>
> >> What I realized is that the consuming application must be aware of the
> >> parallelism of the producing application, which is independent of the
> >> partitions in the intermediate transport.
> >>
> >> Assume I produce in parallel 2 and have 5 kafka partition which I then
> >> read in parallel 3; then in the consuming (parallel 3) application I
> must
> >> wait for watermarks from each original input before I can continue:
> which
> >> is 2
> >> Also we must assume that those watermarks are created at different
> >> timestamps.
> >> So my current assessment is that the watermark records must include at
> >> least the timestamp, the number of the thread for this watermark and the
> >> total number of threads .
> >>
> >> Niels
> >>
> >>
> >> On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann <tr...@apache.org>
> >> wrote:
> >>
> >>> Hi Niels,
> >>>
> >>> if you have multiple inputs going into a single Kafka partition then
> you
> >>> have to calculate the effective watermark by looking at the min
> watermark
> >>> from all inputs. You could insert a Flink operator that takes care of
> it
> >>> and then writes to a set of partitions in 1:n relationship.
> Alternatively,
> >>> you could take a look at Pulsar that wants to support this
> functionality
> >>> out of the box [1].
> >>>
> >>> [1] https://github.com/apache/pulsar/issues/12267
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <Ni...@basjes.nl> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> About a year ago I spoke at the Flink Forward conference (
> >>>> https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling
> >>> development
> >>>> problems regarding streaming applications and handling the lack of
> >>> events
> >>>> in a stream.
> >>>> Something I spoke about towards the end of this talk was the idea to
> >>> ship
> >>>> the watermarks of a Flink topology into the intermediate transport
> >>> between
> >>>> applications so you wouldn't need to recreate them.
> >>>>
> >>>> At that time it was just an idea, today I'm actually trying to build
> >>> that
> >>>> and see if this idea is actually possible.
> >>>>
> >>>> So the class of applications I work on usually do a keyBy on something
> >>> like
> >>>> a SessionId, SensorId or IP address.
> >>>> In low traffic scenarios this means that in Kafka some partitions are
> >>>> completely idle which makes Windows/GroupBy type operations impossible
> >>> (in
> >>>> my talk I explain it a lot better).
> >>>>
> >>>> I have a test setup right now to play around with this and I'm running
> >>> into
> >>>> a bit of a conceptual hurdle for which I'm looking for help.
> >>>>
> >>>> My goal is to ship the watermarks from within a topology into Kafka
> and
> >>>> then let a follow up application extract those watermarks again and
> >>> simply
> >>>> continue.
> >>>> The new SinkWriter interface has a void writeWatermark(Watermark
> >>>> watermark) method
> >>>> that seems intended for this kind of thing.
> >>>> The basic operations like writing a watermark into Kafka, reading it
> >>> again
> >>>> and then recreating the watermark again works in my test setup (very
> >>> messy
> >>>> code but it works).
> >>>>
> >>>> My hurdle has to do with the combination of
> >>>> - different parallelism numbers between Flink and Kafka (how do I
> ship 2
> >>>> watermarks into 3 partitions)
> >>>> - the fact that if you do a keyBy (both in Flink and Kafka) there is a
> >>>> likely mismatch between the Flink 'partition' and the Kafka
> `partition`.
> >>>> - processing speed differences between various threads (like session
> "A"
> >>>> needs more CPU cycles/time/processing than session "B") will lead to
> >>>> skewing of the progression between them.
> >>>> - watermarks in separate threads in a single Flink topology are not
> >>>> synchronized (they cannot and should not be).
> >>>>
> >>>> Has anyone any pointers on possible ways to handle this?
> >>>>
> >>>> Right now my only idea is to ship the watermark into all partitions
> (as
> >>>> they do not have a key!) and let the consuming application determine
> the
> >>>> "real watermark" based on the mix of watermarks coming in from the
> >>> upstream
> >>>> threads.
> >>>>
> >>>> All suggestions and ideas are appreciated.
> >>>>
> >>>> --
> >>>> Best regards / Met vriendelijke groeten,
> >>>>
> >>>> Niels Basjes
> >>>>
> >>>
> >>
> >>
> >> --
> >> Best regards / Met vriendelijke groeten,
> >>
> >> Niels Basjes
> >>
> >
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Sending watermarks into Kafka

Posted by "Matthias J. Sax" <mj...@mailbox.org.INVALID>.
I think this problem should be tackled inside Kafka, not Flink.

Kafka already has internal control messages to write transaction 
markers. Those could be extended to carry watermark information. It 
would be best to generalize those as "user control messages" and 
watermarks could just be one application.

In addition, we might need something link a "producer group" to track 
how many producers are writing into a partition: this would allow to 
inform downstream consumer how many different watermarks they need to track.

It's not an easy problem to solve, but without integrating with the 
storage layer, but trying to solve it at the processing layer, it's even 
harder.

-Matthias

On 12/20/21 01:57, Niels Basjes wrote:
> I'm reading the Pulsar PIP and noticed another thing to take into account:
> multiple applications (with each a different parallelism) that all write
> into the same topic.
> 
> On Mon, 20 Dec 2021, 10:45 Niels Basjes, <Ni...@basjes.nl> wrote:
> 
>> Hi Till,
>>
>> This morning I also realized what you call an 'effective watermark' is
>> indeed what is needed.
>> I'm going to read up on what Pulsar has planned.
>>
>> What I realized is that the consuming application must be aware of the
>> parallelism of the producing application, which is independent of the
>> partitions in the intermediate transport.
>>
>> Assume I produce in parallel 2 and have 5 kafka partition which I then
>> read in parallel 3; then in the consuming (parallel 3) application I must
>> wait for watermarks from each original input before I can continue: which
>> is 2
>> Also we must assume that those watermarks are created at different
>> timestamps.
>> So my current assessment is that the watermark records must include at
>> least the timestamp, the number of the thread for this watermark and the
>> total number of threads .
>>
>> Niels
>>
>>
>> On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Niels,
>>>
>>> if you have multiple inputs going into a single Kafka partition then you
>>> have to calculate the effective watermark by looking at the min watermark
>>> from all inputs. You could insert a Flink operator that takes care of it
>>> and then writes to a set of partitions in 1:n relationship. Alternatively,
>>> you could take a look at Pulsar that wants to support this functionality
>>> out of the box [1].
>>>
>>> [1] https://github.com/apache/pulsar/issues/12267
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <Ni...@basjes.nl> wrote:
>>>
>>>> Hi,
>>>>
>>>> About a year ago I spoke at the Flink Forward conference (
>>>> https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling
>>> development
>>>> problems regarding streaming applications and handling the lack of
>>> events
>>>> in a stream.
>>>> Something I spoke about towards the end of this talk was the idea to
>>> ship
>>>> the watermarks of a Flink topology into the intermediate transport
>>> between
>>>> applications so you wouldn't need to recreate them.
>>>>
>>>> At that time it was just an idea, today I'm actually trying to build
>>> that
>>>> and see if this idea is actually possible.
>>>>
>>>> So the class of applications I work on usually do a keyBy on something
>>> like
>>>> a SessionId, SensorId or IP address.
>>>> In low traffic scenarios this means that in Kafka some partitions are
>>>> completely idle which makes Windows/GroupBy type operations impossible
>>> (in
>>>> my talk I explain it a lot better).
>>>>
>>>> I have a test setup right now to play around with this and I'm running
>>> into
>>>> a bit of a conceptual hurdle for which I'm looking for help.
>>>>
>>>> My goal is to ship the watermarks from within a topology into Kafka and
>>>> then let a follow up application extract those watermarks again and
>>> simply
>>>> continue.
>>>> The new SinkWriter interface has a void writeWatermark(Watermark
>>>> watermark) method
>>>> that seems intended for this kind of thing.
>>>> The basic operations like writing a watermark into Kafka, reading it
>>> again
>>>> and then recreating the watermark again works in my test setup (very
>>> messy
>>>> code but it works).
>>>>
>>>> My hurdle has to do with the combination of
>>>> - different parallelism numbers between Flink and Kafka (how do I ship 2
>>>> watermarks into 3 partitions)
>>>> - the fact that if you do a keyBy (both in Flink and Kafka) there is a
>>>> likely mismatch between the Flink 'partition' and the Kafka `partition`.
>>>> - processing speed differences between various threads (like session "A"
>>>> needs more CPU cycles/time/processing than session "B") will lead to
>>>> skewing of the progression between them.
>>>> - watermarks in separate threads in a single Flink topology are not
>>>> synchronized (they cannot and should not be).
>>>>
>>>> Has anyone any pointers on possible ways to handle this?
>>>>
>>>> Right now my only idea is to ship the watermark into all partitions (as
>>>> they do not have a key!) and let the consuming application determine the
>>>> "real watermark" based on the mix of watermarks coming in from the
>>> upstream
>>>> threads.
>>>>
>>>> All suggestions and ideas are appreciated.
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
> 

Re: Sending watermarks into Kafka

Posted by Niels Basjes <Ni...@basjes.nl>.
I'm reading the Pulsar PIP and noticed another thing to take into account:
multiple applications (with each a different parallelism) that all write
into the same topic.

On Mon, 20 Dec 2021, 10:45 Niels Basjes, <Ni...@basjes.nl> wrote:

> Hi Till,
>
> This morning I also realized what you call an 'effective watermark' is
> indeed what is needed.
> I'm going to read up on what Pulsar has planned.
>
> What I realized is that the consuming application must be aware of the
> parallelism of the producing application, which is independent of the
> partitions in the intermediate transport.
>
> Assume I produce in parallel 2 and have 5 kafka partition which I then
> read in parallel 3; then in the consuming (parallel 3) application I must
> wait for watermarks from each original input before I can continue: which
> is 2
> Also we must assume that those watermarks are created at different
> timestamps.
> So my current assessment is that the watermark records must include at
> least the timestamp, the number of the thread for this watermark and the
> total number of threads .
>
> Niels
>
>
> On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Niels,
>>
>> if you have multiple inputs going into a single Kafka partition then you
>> have to calculate the effective watermark by looking at the min watermark
>> from all inputs. You could insert a Flink operator that takes care of it
>> and then writes to a set of partitions in 1:n relationship. Alternatively,
>> you could take a look at Pulsar that wants to support this functionality
>> out of the box [1].
>>
>> [1] https://github.com/apache/pulsar/issues/12267
>>
>> Cheers,
>> Till
>>
>> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <Ni...@basjes.nl> wrote:
>>
>> > Hi,
>> >
>> > About a year ago I spoke at the Flink Forward conference (
>> > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling
>> development
>> > problems regarding streaming applications and handling the lack of
>> events
>> > in a stream.
>> > Something I spoke about towards the end of this talk was the idea to
>> ship
>> > the watermarks of a Flink topology into the intermediate transport
>> between
>> > applications so you wouldn't need to recreate them.
>> >
>> > At that time it was just an idea, today I'm actually trying to build
>> that
>> > and see if this idea is actually possible.
>> >
>> > So the class of applications I work on usually do a keyBy on something
>> like
>> > a SessionId, SensorId or IP address.
>> > In low traffic scenarios this means that in Kafka some partitions are
>> > completely idle which makes Windows/GroupBy type operations impossible
>> (in
>> > my talk I explain it a lot better).
>> >
>> > I have a test setup right now to play around with this and I'm running
>> into
>> > a bit of a conceptual hurdle for which I'm looking for help.
>> >
>> > My goal is to ship the watermarks from within a topology into Kafka and
>> > then let a follow up application extract those watermarks again and
>> simply
>> > continue.
>> > The new SinkWriter interface has a void writeWatermark(Watermark
>> > watermark) method
>> > that seems intended for this kind of thing.
>> > The basic operations like writing a watermark into Kafka, reading it
>> again
>> > and then recreating the watermark again works in my test setup (very
>> messy
>> > code but it works).
>> >
>> > My hurdle has to do with the combination of
>> > - different parallelism numbers between Flink and Kafka (how do I ship 2
>> > watermarks into 3 partitions)
>> > - the fact that if you do a keyBy (both in Flink and Kafka) there is a
>> > likely mismatch between the Flink 'partition' and the Kafka `partition`.
>> > - processing speed differences between various threads (like session "A"
>> > needs more CPU cycles/time/processing than session "B") will lead to
>> > skewing of the progression between them.
>> > - watermarks in separate threads in a single Flink topology are not
>> > synchronized (they cannot and should not be).
>> >
>> > Has anyone any pointers on possible ways to handle this?
>> >
>> > Right now my only idea is to ship the watermark into all partitions (as
>> > they do not have a key!) and let the consuming application determine the
>> > "real watermark" based on the mix of watermarks coming in from the
>> upstream
>> > threads.
>> >
>> > All suggestions and ideas are appreciated.
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>> >
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: Sending watermarks into Kafka

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi Till,

This morning I also realized what you call an 'effective watermark' is
indeed what is needed.
I'm going to read up on what Pulsar has planned.

What I realized is that the consuming application must be aware of the
parallelism of the producing application, which is independent of the
partitions in the intermediate transport.

Assume I produce in parallel 2 and have 5 kafka partition which I then read
in parallel 3; then in the consuming (parallel 3) application I must wait
for watermarks from each original input before I can continue: which is 2
Also we must assume that those watermarks are created at different
timestamps.
So my current assessment is that the watermark records must include at
least the timestamp, the number of the thread for this watermark and the
total number of threads .

Niels


On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Niels,
>
> if you have multiple inputs going into a single Kafka partition then you
> have to calculate the effective watermark by looking at the min watermark
> from all inputs. You could insert a Flink operator that takes care of it
> and then writes to a set of partitions in 1:n relationship. Alternatively,
> you could take a look at Pulsar that wants to support this functionality
> out of the box [1].
>
> [1] https://github.com/apache/pulsar/issues/12267
>
> Cheers,
> Till
>
> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <Ni...@basjes.nl> wrote:
>
> > Hi,
> >
> > About a year ago I spoke at the Flink Forward conference (
> > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development
> > problems regarding streaming applications and handling the lack of events
> > in a stream.
> > Something I spoke about towards the end of this talk was the idea to ship
> > the watermarks of a Flink topology into the intermediate transport
> between
> > applications so you wouldn't need to recreate them.
> >
> > At that time it was just an idea, today I'm actually trying to build that
> > and see if this idea is actually possible.
> >
> > So the class of applications I work on usually do a keyBy on something
> like
> > a SessionId, SensorId or IP address.
> > In low traffic scenarios this means that in Kafka some partitions are
> > completely idle which makes Windows/GroupBy type operations impossible
> (in
> > my talk I explain it a lot better).
> >
> > I have a test setup right now to play around with this and I'm running
> into
> > a bit of a conceptual hurdle for which I'm looking for help.
> >
> > My goal is to ship the watermarks from within a topology into Kafka and
> > then let a follow up application extract those watermarks again and
> simply
> > continue.
> > The new SinkWriter interface has a void writeWatermark(Watermark
> > watermark) method
> > that seems intended for this kind of thing.
> > The basic operations like writing a watermark into Kafka, reading it
> again
> > and then recreating the watermark again works in my test setup (very
> messy
> > code but it works).
> >
> > My hurdle has to do with the combination of
> > - different parallelism numbers between Flink and Kafka (how do I ship 2
> > watermarks into 3 partitions)
> > - the fact that if you do a keyBy (both in Flink and Kafka) there is a
> > likely mismatch between the Flink 'partition' and the Kafka `partition`.
> > - processing speed differences between various threads (like session "A"
> > needs more CPU cycles/time/processing than session "B") will lead to
> > skewing of the progression between them.
> > - watermarks in separate threads in a single Flink topology are not
> > synchronized (they cannot and should not be).
> >
> > Has anyone any pointers on possible ways to handle this?
> >
> > Right now my only idea is to ship the watermark into all partitions (as
> > they do not have a key!) and let the consuming application determine the
> > "real watermark" based on the mix of watermarks coming in from the
> upstream
> > threads.
> >
> > All suggestions and ideas are appreciated.
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Sending watermarks into Kafka

Posted by Till Rohrmann <tr...@apache.org>.
Hi Niels,

if you have multiple inputs going into a single Kafka partition then you
have to calculate the effective watermark by looking at the min watermark
from all inputs. You could insert a Flink operator that takes care of it
and then writes to a set of partitions in 1:n relationship. Alternatively,
you could take a look at Pulsar that wants to support this functionality
out of the box [1].

[1] https://github.com/apache/pulsar/issues/12267

Cheers,
Till

On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> About a year ago I spoke at the Flink Forward conference (
> https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development
> problems regarding streaming applications and handling the lack of events
> in a stream.
> Something I spoke about towards the end of this talk was the idea to ship
> the watermarks of a Flink topology into the intermediate transport between
> applications so you wouldn't need to recreate them.
>
> At that time it was just an idea, today I'm actually trying to build that
> and see if this idea is actually possible.
>
> So the class of applications I work on usually do a keyBy on something like
> a SessionId, SensorId or IP address.
> In low traffic scenarios this means that in Kafka some partitions are
> completely idle which makes Windows/GroupBy type operations impossible (in
> my talk I explain it a lot better).
>
> I have a test setup right now to play around with this and I'm running into
> a bit of a conceptual hurdle for which I'm looking for help.
>
> My goal is to ship the watermarks from within a topology into Kafka and
> then let a follow up application extract those watermarks again and simply
> continue.
> The new SinkWriter interface has a void writeWatermark(Watermark
> watermark) method
> that seems intended for this kind of thing.
> The basic operations like writing a watermark into Kafka, reading it again
> and then recreating the watermark again works in my test setup (very messy
> code but it works).
>
> My hurdle has to do with the combination of
> - different parallelism numbers between Flink and Kafka (how do I ship 2
> watermarks into 3 partitions)
> - the fact that if you do a keyBy (both in Flink and Kafka) there is a
> likely mismatch between the Flink 'partition' and the Kafka `partition`.
> - processing speed differences between various threads (like session "A"
> needs more CPU cycles/time/processing than session "B") will lead to
> skewing of the progression between them.
> - watermarks in separate threads in a single Flink topology are not
> synchronized (they cannot and should not be).
>
> Has anyone any pointers on possible ways to handle this?
>
> Right now my only idea is to ship the watermark into all partitions (as
> they do not have a key!) and let the consuming application determine the
> "real watermark" based on the mix of watermarks coming in from the upstream
> threads.
>
> All suggestions and ideas are appreciated.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>