You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jean-Baptiste Onofré <jb...@nanthrax.net> on 2017/05/10 15:51:54 UTC

Direct runner doesn't seem to finalize checkpoint "quickly"

Hi Beamers,

I'm working on some fixes in the JmsIO and MqttIO.

Those two IOs behave in a similar way on the reading side:

- they consume messages from a JMS or MQTT broker
- the "pending" messages are stored in the checkpoint mark. When a new message 
is added to the checkpoint, we compare the timestamp of the message with the 
oldest pending message timestamp. It advances the watermark: so the watermark is 
basically the timestamp of the oldest pending message.
- when the runner calls finalize on the checkpoint, then, we ack the messages.

Testing this with direct runner, it seems the finalize is never called on 
checkpoints. So, basically, it means that the messages are not fully consumed 
from the broker (as the ack is not sent).

I tried to a fair volume of messages (1000000) and the checkpoint is not finalize.

Basically, I have two questions:
1. what do you think about this approach: storing pending messages and advancing 
the watermark this way ?
2. any idea when the direct runner will call the finalize on the checkpoint ?

Thanks !
Regards
JB
-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Direct runner doesn't seem to finalize checkpoint "quickly"

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi,

I moved forward a bit on this. Actually, the IOs seem to work fine from a 
reading perspective.
My issue is more on window/trigger in the pipeline.

First, I used FixedWindows with the default trigger (event-time based). With 
this windows, it seems the trigger is never executed (I have to check the 
watermark here).

So, I changed to FixedWindows but with a different trigger (speculative):

 
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
 
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
                         .withAllowedLateness(Duration.ZERO)
                         .discardingFiredPanes()
                 )

It's a bit "aggressive" but the trigger is fired.

I'm still investigating around watermark and so.

Regards
JB

On 05/10/2017 06:36 PM, Jean-Baptiste Onofré wrote:
> Thanks Thomas,
>
> Let me try what you are proposing. I keep you posted.
>
> Regards
> JB
>
> On 05/10/2017 06:33 PM, Thomas Groh wrote:
>> I'm going to start with number two, because it's got an easy answer:
>> When performing an unbounded read, the DirectRunner will finalize a
>> checkpoint after it has completed a subsequent read from that split where
>> at least one element was read. A bounded read from an unbounded source will
>> never be finalized by any runner.
>> See
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L221
>>
>>
>> For number one:
>>   having a checkpoint contain unacked messages is reasonable (required).
>> Acking those messages when a checkpoint is finalized is also reasonable.
>>   Checkpoints must not track any records produced after the call to
>> checkpoint() that they were produced in. If they do, they will improperly
>> finalize messages that have not been committed. Creating a new checkpoint
>> whenever a reader is started or a checkpoint is taken and storing state in
>> them is a suitable way to ensure this.
>>
>> You will likely need the reader to maintain its own view of pending unacked
>> messages, which finalizeCheckpoint can update (in a thread-safe manner,
>> guarding against the reader no longer being present). You may be able to
>> track these at the per-checkpoint level, where a finalized checkpoint is
>> removed from the collection of things that hold the watermark.
>>
>>
>> On Wed, May 10, 2017 at 8:51 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>>> Hi Beamers,
>>>
>>> I'm working on some fixes in the JmsIO and MqttIO.
>>>
>>> Those two IOs behave in a similar way on the reading side:
>>>
>>> - they consume messages from a JMS or MQTT broker
>>> - the "pending" messages are stored in the checkpoint mark. When a new
>>> message is added to the checkpoint, we compare the timestamp of the message
>>> with the oldest pending message timestamp. It advances the watermark: so
>>> the watermark is basically the timestamp of the oldest pending message.
>>> - when the runner calls finalize on the checkpoint, then, we ack the
>>> messages.
>>>
>>> Testing this with direct runner, it seems the finalize is never called on
>>> checkpoints. So, basically, it means that the messages are not fully
>>> consumed from the broker (as the ack is not sent).
>>>
>>> I tried to a fair volume of messages (1000000) and the checkpoint is not
>>> finalize.
>>>
>>> Basically, I have two questions:
>>> 1. what do you think about this approach: storing pending messages and
>>> advancing the watermark this way ?
>>> 2. any idea when the direct runner will call the finalize on the
>>> checkpoint ?
>>>
>>> Thanks !
>>> Regards
>>> JB
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Direct runner doesn't seem to finalize checkpoint "quickly"

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Thanks Thomas,

Let me try what you are proposing. I keep you posted.

Regards
JB

On 05/10/2017 06:33 PM, Thomas Groh wrote:
> I'm going to start with number two, because it's got an easy answer:
> When performing an unbounded read, the DirectRunner will finalize a
> checkpoint after it has completed a subsequent read from that split where
> at least one element was read. A bounded read from an unbounded source will
> never be finalized by any runner.
> See
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L221
>
> For number one:
>   having a checkpoint contain unacked messages is reasonable (required).
> Acking those messages when a checkpoint is finalized is also reasonable.
>   Checkpoints must not track any records produced after the call to
> checkpoint() that they were produced in. If they do, they will improperly
> finalize messages that have not been committed. Creating a new checkpoint
> whenever a reader is started or a checkpoint is taken and storing state in
> them is a suitable way to ensure this.
>
> You will likely need the reader to maintain its own view of pending unacked
> messages, which finalizeCheckpoint can update (in a thread-safe manner,
> guarding against the reader no longer being present). You may be able to
> track these at the per-checkpoint level, where a finalized checkpoint is
> removed from the collection of things that hold the watermark.
>
>
> On Wed, May 10, 2017 at 8:51 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
>> Hi Beamers,
>>
>> I'm working on some fixes in the JmsIO and MqttIO.
>>
>> Those two IOs behave in a similar way on the reading side:
>>
>> - they consume messages from a JMS or MQTT broker
>> - the "pending" messages are stored in the checkpoint mark. When a new
>> message is added to the checkpoint, we compare the timestamp of the message
>> with the oldest pending message timestamp. It advances the watermark: so
>> the watermark is basically the timestamp of the oldest pending message.
>> - when the runner calls finalize on the checkpoint, then, we ack the
>> messages.
>>
>> Testing this with direct runner, it seems the finalize is never called on
>> checkpoints. So, basically, it means that the messages are not fully
>> consumed from the broker (as the ack is not sent).
>>
>> I tried to a fair volume of messages (1000000) and the checkpoint is not
>> finalize.
>>
>> Basically, I have two questions:
>> 1. what do you think about this approach: storing pending messages and
>> advancing the watermark this way ?
>> 2. any idea when the direct runner will call the finalize on the
>> checkpoint ?
>>
>> Thanks !
>> Regards
>> JB
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Direct runner doesn't seem to finalize checkpoint "quickly"

Posted by Thomas Groh <tg...@google.com.INVALID>.
I'm going to start with number two, because it's got an easy answer:
When performing an unbounded read, the DirectRunner will finalize a
checkpoint after it has completed a subsequent read from that split where
at least one element was read. A bounded read from an unbounded source will
never be finalized by any runner.
See
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L221

For number one:
  having a checkpoint contain unacked messages is reasonable (required).
Acking those messages when a checkpoint is finalized is also reasonable.
  Checkpoints must not track any records produced after the call to
checkpoint() that they were produced in. If they do, they will improperly
finalize messages that have not been committed. Creating a new checkpoint
whenever a reader is started or a checkpoint is taken and storing state in
them is a suitable way to ensure this.

You will likely need the reader to maintain its own view of pending unacked
messages, which finalizeCheckpoint can update (in a thread-safe manner,
guarding against the reader no longer being present). You may be able to
track these at the per-checkpoint level, where a finalized checkpoint is
removed from the collection of things that hold the watermark.


On Wed, May 10, 2017 at 8:51 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Beamers,
>
> I'm working on some fixes in the JmsIO and MqttIO.
>
> Those two IOs behave in a similar way on the reading side:
>
> - they consume messages from a JMS or MQTT broker
> - the "pending" messages are stored in the checkpoint mark. When a new
> message is added to the checkpoint, we compare the timestamp of the message
> with the oldest pending message timestamp. It advances the watermark: so
> the watermark is basically the timestamp of the oldest pending message.
> - when the runner calls finalize on the checkpoint, then, we ack the
> messages.
>
> Testing this with direct runner, it seems the finalize is never called on
> checkpoints. So, basically, it means that the messages are not fully
> consumed from the broker (as the ack is not sent).
>
> I tried to a fair volume of messages (1000000) and the checkpoint is not
> finalize.
>
> Basically, I have two questions:
> 1. what do you think about this approach: storing pending messages and
> advancing the watermark this way ?
> 2. any idea when the direct runner will call the finalize on the
> checkpoint ?
>
> Thanks !
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>