You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Mateusz Zakarczemny <m....@gmail.com> on 2018/06/17 21:35:18 UTC

Operator checkpointing not working

Hi all,
I created simply app to test apex fault tolerance. It is build from three
main operators:
- sequence generator - operator which generate increasing numbers. One per
time window
- aggregator - just adds incoming number to the list and emits whole list
downstream
- file output - operator which writes incoming messages to the file
To make it faulty, aggregator operator throws an exception for 10% of
messages. Source code is here https://github.com/Matzz/apex-example

I'm running it on sandbox docker image. I thought that even if aggregation
operator is faulty, application will checkpoint its state. So over the time
output list should be longer and longer.
Unfortunately, it looks like on each failure app is resenting it state to
the beginning. Sample output:

*tail -f -n 100 /tmp/stream.out *

*Creating FileOutput 2018-06-16T22:07:01.033*
*Creating aggreagator 2018-06-16T22:07:01.040*
*Creating FileOutput 2018-06-16T22:07:01.041*
*Creating FileOutput 2018-06-16T22:07:02.719*
*Creating aggreagator 2018-06-16T22:07:02.722*
*Creating FileOutput 2018-06-16T22:07:02.723*
*Creating FileOutput 2018-06-16T22:08:48.178*
*Creating aggreagator 2018-06-16T22:08:48.185*
*Creating FileOutput 2018-06-16T22:08:48.186*
*Creating FileOutput 2018-06-16T22:08:49.847*
*Creating aggreagator 2018-06-16T22:08:49.850*
*Creating FileOutput 2018-06-16T22:08:49.852*
*Creating FileOutput 2018-06-16T22:08:56.736*
*Creating aggreagator 2018-06-16T22:08:56.740*
*Creating FileOutput 2018-06-16T22:08:56.743*
*Creating FileOutput 2018-06-16T22:08:57.899*
*Creating aggreagator 2018-06-16T22:08:57.899*
*Creating FileOutput 2018-06-16T22:08:57.899*
*Creating FileOutput 2018-06-16T22:09:10.951*
*Creating FileOutput 2018-06-16T22:09:10.986*
*Creating aggreagator 2018-06-16T22:09:11.001*
*Failing sequence generator!2018-06-16T22:09:11.029*
*Creating FileOutput 2018-06-16T22:09:19.484*
*Creating FileOutput 2018-06-16T22:09:19.506*
*Creating aggreagator 2018-06-16T22:09:19.518*
*Failing sequence generator!2018-06-16T22:09:19.542*
*Creating FileOutput 2018-06-16T22:09:28.646*
*Creating FileOutput 2018-06-16T22:09:28.668*
*Creating aggreagator 2018-06-16T22:09:28.680*
*Failing sequence generator!2018-06-16T22:09:28.704*
*[1.0]*
*Creating FileOutput 2018-06-16T22:09:37.864*
*Creating FileOutput 2018-06-16T22:09:37.886*
*Creating aggreagator 2018-06-16T22:09:37.897*
*Failing sequence generator!2018-06-16T22:09:37.924*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
*Creating FileOutput 2018-06-16T22:09:46.921*
*Creating FileOutput 2018-06-16T22:09:46.944*
*Creating aggreagator 2018-06-16T22:09:46.956*
*Failing sequence generator!2018-06-16T22:09:46.980*
*[1.0, 2.0, 3.0, 4.0]*
*[1.0, 2.0, 3.0, 4.0]*
*[1.0, 2.0, 3.0, 4.0]*
*[1.0, 2.0, 3.0, 4.0]*
*Creating FileOutput 2018-06-16T22:09:56.049*
*Creating FileOutput 2018-06-16T22:09:56.070*
*Creating aggreagator 2018-06-16T22:09:56.081*
*Failing sequence generator!2018-06-16T22:09:56.112*
*[1.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
*[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
*Creating FileOutput 2018-06-16T22:10:05.213*
*Creating FileOutput 2018-06-16T22:10:05.232*
*Creating aggreagator 2018-06-16T22:10:05.241*
*Failing sequence generator!2018-06-16T22:10:05.266**[1.0, 2.0]*
*[1.0, 2.0]*



 Could I ask for some explanation what I'm doing wrong?

Regards,
Matuesz Zakarczemny

Re: Operator checkpointing not working

Posted by Mateusz Zakarczemny <m....@gmail.com>.
It was a silly mistake. Thanks  for help. I really appreciate it.
App output looks better now but only for a while. Starting from some point
the only output from app is failure log and operator creation log. No
output of aggregated list.
For debug I started to write generated random number to file. I suspected
that failure generator is serialized somehow and due to that from some
point it is generating the same number. However output shows it's not the
case.
The application output is here -
https://github.com/Matzz/apex-example/blob/master/stream.out
Yarn log - https://raw.githubusercontent.com/Matzz/apex-example/master/logs

Regards,
Mateusz

pon., 18 cze 2018 o 16:25 Ambarish Pande <am...@gmail.com>
napisał(a):

> Hi Mateusz,
>
> The property name should be "*apex.attr.CHECKPOINT_WINDOW_**COUNT*"
> instead of aptest.attr.CHECKPOINT_WINDOW_COUNT.
>
> Thanks
> Ambarish
>

Re: Operator checkpointing not working

Posted by Ambarish Pande <am...@gmail.com>.
Hi Mateusz,

The property name should be "*apex.attr.CHECKPOINT_WINDOW_**COUNT*" instead
of aptest.attr.CHECKPOINT_WINDOW_COUNT.

Thanks
Ambarish

Re: Operator checkpointing not working

Posted by Mateusz Zakarczemny <m....@gmail.com>.
I thought that I change that by setting CHECKPOINT_WINDOW_COUNT
https://github.com/Matzz/apex-example/blob/master/src/main/resources/META-INF/properties.xml
Should I use different property?

pon., 18 cze 2018 o 15:26 Thomas Weise <th...@apache.org> napisał(a):

> The default checkpoint interval is 30s and the interval between failing
> aggregators is approximately 10s? In that case, no state will ever get
> checkpointed and operator reset to initial state.
>
> Thomas
>
> --
> sent from mobile
>
> On Mon, Jun 18, 2018, 12:45 PM Mateusz Zakarczemny <
> m.zakarczemny@gmail.com> wrote:
>
>> Hi Pramod,
>> I removed transient but result is the same -
>> https://github.com/Matzz/apex-example/blob/master/src/main/java/aptest/Aggregator.java
>>
>> Creating aggregator 2018-06-18T10:42:50.582
>> Failing aggregator! 2018-06-18T10:42:50.707
>> Creating FileOutput 2018-06-18T10:42:50.848
>> [1.0]
>> [1.0, 2.0]
>> [1.0, 2.0, 3.0]
>> [1.0, 2.0, 3.0, 4.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0]
>> Creating aggregator 2018-06-18T10:42:59.683
>> Failing aggregator! 2018-06-18T10:42:59.794
>> Creating FileOutput 2018-06-18T10:42:59.926
>> Creating aggregator 2018-06-18T10:43:08.810
>> Failing aggregator! 2018-06-18T10:43:08.918
>> Creating FileOutput 2018-06-18T10:43:08.988
>> [1.0]
>> [1.0, 2.0]
>> [1.0, 2.0, 3.0]
>> Creating FileOutput 2018-06-18T10:43:18.059
>> Creating aggregator 2018-06-18T10:43:18.142
>> Failing aggregator! 2018-06-18T10:43:18.227
>> [1.0]
>> [1.0, 2.0]
>> [1.0, 2.0, 3.0]
>> [1.0, 2.0, 3.0, 4.0]
>> Creating FileOutput 2018-06-18T10:43:27.130
>> Creating aggregator 2018-06-18T10:43:27.135
>> Failing aggregator! 2018-06-18T10:43:27.228
>> [1.0]
>> [1.0, 2.0]
>> [1.0, 2.0, 3.0]
>> [1.0, 2.0, 3.0, 4.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
>> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0]
>>
>>
>>
>> pon., 18 cze 2018 o 00:16 Pramod Immaneni <pr...@gmail.com>
>> napisał(a):
>>
>>> Hi Matuesz,
>>>
>>> It is because you have defined the list as transient in the Aggregator.
>>> Transient elements are not serialized and included when the checkpoint is
>>> created.
>>>
>>> Thanks
>>> On Sun, Jun 17, 2018 at 2:35 PM Mateusz Zakarczemny <
>>> m.zakarczemny@gmail.com> wrote:
>>>
>>>> Hi all,
>>>> I created simply app to test apex fault tolerance. It is build from
>>>> three main operators:
>>>> - sequence generator - operator which generate increasing numbers. One
>>>> per time window
>>>> - aggregator - just adds incoming number to the list and emits whole
>>>> list downstream
>>>> - file output - operator which writes incoming messages to the file
>>>> To make it faulty, aggregator operator throws an exception for 10% of
>>>> messages. Source code is here https://github.com/Matzz/apex-example
>>>>
>>>> I'm running it on sandbox docker image. I thought that even if
>>>> aggregation operator is faulty, application will checkpoint its state.
>>>> So over the time output list should be longer and longer.
>>>> Unfortunately, it looks like on each failure app is resenting it state
>>>> to the beginning. Sample output:
>>>>
>>>> *tail -f -n 100 /tmp/stream.out *
>>>>
>>>> *Creating FileOutput 2018-06-16T22:07:01.033*
>>>> *Creating aggreagator 2018-06-16T22:07:01.040*
>>>> *Creating FileOutput 2018-06-16T22:07:01.041*
>>>> *Creating FileOutput 2018-06-16T22:07:02.719*
>>>> *Creating aggreagator 2018-06-16T22:07:02.722*
>>>> *Creating FileOutput 2018-06-16T22:07:02.723*
>>>> *Creating FileOutput 2018-06-16T22:08:48.178*
>>>> *Creating aggreagator 2018-06-16T22:08:48.185*
>>>> *Creating FileOutput 2018-06-16T22:08:48.186*
>>>> *Creating FileOutput 2018-06-16T22:08:49.847*
>>>> *Creating aggreagator 2018-06-16T22:08:49.850*
>>>> *Creating FileOutput 2018-06-16T22:08:49.852*
>>>> *Creating FileOutput 2018-06-16T22:08:56.736*
>>>> *Creating aggreagator 2018-06-16T22:08:56.740*
>>>> *Creating FileOutput 2018-06-16T22:08:56.743*
>>>> *Creating FileOutput 2018-06-16T22:08:57.899*
>>>> *Creating aggreagator 2018-06-16T22:08:57.899*
>>>> *Creating FileOutput 2018-06-16T22:08:57.899*
>>>> *Creating FileOutput 2018-06-16T22:09:10.951*
>>>> *Creating FileOutput 2018-06-16T22:09:10.986*
>>>> *Creating aggreagator 2018-06-16T22:09:11.001*
>>>> *Failing sequence generator!2018-06-16T22:09:11.029*
>>>> *Creating FileOutput 2018-06-16T22:09:19.484*
>>>> *Creating FileOutput 2018-06-16T22:09:19.506*
>>>> *Creating aggreagator 2018-06-16T22:09:19.518*
>>>> *Failing sequence generator!2018-06-16T22:09:19.542*
>>>> *Creating FileOutput 2018-06-16T22:09:28.646*
>>>> *Creating FileOutput 2018-06-16T22:09:28.668*
>>>> *Creating aggreagator 2018-06-16T22:09:28.680*
>>>> *Failing sequence generator!2018-06-16T22:09:28.704*
>>>> *[1.0]*
>>>> *Creating FileOutput 2018-06-16T22:09:37.864*
>>>> *Creating FileOutput 2018-06-16T22:09:37.886*
>>>> *Creating aggreagator 2018-06-16T22:09:37.897*
>>>> *Failing sequence generator!2018-06-16T22:09:37.924*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>>> *Creating FileOutput 2018-06-16T22:09:46.921*
>>>> *Creating FileOutput 2018-06-16T22:09:46.944*
>>>> *Creating aggreagator 2018-06-16T22:09:46.956*
>>>> *Failing sequence generator!2018-06-16T22:09:46.980*
>>>> *[1.0, 2.0, 3.0, 4.0]*
>>>> *[1.0, 2.0, 3.0, 4.0]*
>>>> *[1.0, 2.0, 3.0, 4.0]*
>>>> *[1.0, 2.0, 3.0, 4.0]*
>>>> *Creating FileOutput 2018-06-16T22:09:56.049*
>>>> *Creating FileOutput 2018-06-16T22:09:56.070*
>>>> *Creating aggreagator 2018-06-16T22:09:56.081*
>>>> *Failing sequence generator!2018-06-16T22:09:56.112*
>>>> *[1.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>>> *Creating FileOutput 2018-06-16T22:10:05.213*
>>>> *Creating FileOutput 2018-06-16T22:10:05.232*
>>>> *Creating aggreagator 2018-06-16T22:10:05.241*
>>>> *Failing sequence generator!2018-06-16T22:10:05.266**[1.0, 2.0]*
>>>> *[1.0, 2.0]*
>>>>
>>>>
>>>>
>>>>  Could I ask for some explanation what I'm doing wrong?
>>>>
>>>> Regards,
>>>> Matuesz Zakarczemny
>>>>
>>>>

Re: Operator checkpointing not working

Posted by Thomas Weise <th...@apache.org>.
The default checkpoint interval is 30s and the interval between failing
aggregators is approximately 10s? In that case, no state will ever get
checkpointed and operator reset to initial state.

Thomas

--
sent from mobile

On Mon, Jun 18, 2018, 12:45 PM Mateusz Zakarczemny <m....@gmail.com>
wrote:

> Hi Pramod,
> I removed transient but result is the same -
> https://github.com/Matzz/apex-example/blob/master/src/main/java/aptest/Aggregator.java
>
> Creating aggregator 2018-06-18T10:42:50.582
> Failing aggregator! 2018-06-18T10:42:50.707
> Creating FileOutput 2018-06-18T10:42:50.848
> [1.0]
> [1.0, 2.0]
> [1.0, 2.0, 3.0]
> [1.0, 2.0, 3.0, 4.0]
> [1.0, 2.0, 3.0, 4.0, 5.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0]
> Creating aggregator 2018-06-18T10:42:59.683
> Failing aggregator! 2018-06-18T10:42:59.794
> Creating FileOutput 2018-06-18T10:42:59.926
> Creating aggregator 2018-06-18T10:43:08.810
> Failing aggregator! 2018-06-18T10:43:08.918
> Creating FileOutput 2018-06-18T10:43:08.988
> [1.0]
> [1.0, 2.0]
> [1.0, 2.0, 3.0]
> Creating FileOutput 2018-06-18T10:43:18.059
> Creating aggregator 2018-06-18T10:43:18.142
> Failing aggregator! 2018-06-18T10:43:18.227
> [1.0]
> [1.0, 2.0]
> [1.0, 2.0, 3.0]
> [1.0, 2.0, 3.0, 4.0]
> Creating FileOutput 2018-06-18T10:43:27.130
> Creating aggregator 2018-06-18T10:43:27.135
> Failing aggregator! 2018-06-18T10:43:27.228
> [1.0]
> [1.0, 2.0]
> [1.0, 2.0, 3.0]
> [1.0, 2.0, 3.0, 4.0]
> [1.0, 2.0, 3.0, 4.0, 5.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0]
>
>
>
> pon., 18 cze 2018 o 00:16 Pramod Immaneni <pr...@gmail.com>
> napisał(a):
>
>> Hi Matuesz,
>>
>> It is because you have defined the list as transient in the Aggregator.
>> Transient elements are not serialized and included when the checkpoint is
>> created.
>>
>> Thanks
>> On Sun, Jun 17, 2018 at 2:35 PM Mateusz Zakarczemny <
>> m.zakarczemny@gmail.com> wrote:
>>
>>> Hi all,
>>> I created simply app to test apex fault tolerance. It is build from
>>> three main operators:
>>> - sequence generator - operator which generate increasing numbers. One
>>> per time window
>>> - aggregator - just adds incoming number to the list and emits whole
>>> list downstream
>>> - file output - operator which writes incoming messages to the file
>>> To make it faulty, aggregator operator throws an exception for 10% of
>>> messages. Source code is here https://github.com/Matzz/apex-example
>>>
>>> I'm running it on sandbox docker image. I thought that even if
>>> aggregation operator is faulty, application will checkpoint its state.
>>> So over the time output list should be longer and longer.
>>> Unfortunately, it looks like on each failure app is resenting it state
>>> to the beginning. Sample output:
>>>
>>> *tail -f -n 100 /tmp/stream.out *
>>>
>>> *Creating FileOutput 2018-06-16T22:07:01.033*
>>> *Creating aggreagator 2018-06-16T22:07:01.040*
>>> *Creating FileOutput 2018-06-16T22:07:01.041*
>>> *Creating FileOutput 2018-06-16T22:07:02.719*
>>> *Creating aggreagator 2018-06-16T22:07:02.722*
>>> *Creating FileOutput 2018-06-16T22:07:02.723*
>>> *Creating FileOutput 2018-06-16T22:08:48.178*
>>> *Creating aggreagator 2018-06-16T22:08:48.185*
>>> *Creating FileOutput 2018-06-16T22:08:48.186*
>>> *Creating FileOutput 2018-06-16T22:08:49.847*
>>> *Creating aggreagator 2018-06-16T22:08:49.850*
>>> *Creating FileOutput 2018-06-16T22:08:49.852*
>>> *Creating FileOutput 2018-06-16T22:08:56.736*
>>> *Creating aggreagator 2018-06-16T22:08:56.740*
>>> *Creating FileOutput 2018-06-16T22:08:56.743*
>>> *Creating FileOutput 2018-06-16T22:08:57.899*
>>> *Creating aggreagator 2018-06-16T22:08:57.899*
>>> *Creating FileOutput 2018-06-16T22:08:57.899*
>>> *Creating FileOutput 2018-06-16T22:09:10.951*
>>> *Creating FileOutput 2018-06-16T22:09:10.986*
>>> *Creating aggreagator 2018-06-16T22:09:11.001*
>>> *Failing sequence generator!2018-06-16T22:09:11.029*
>>> *Creating FileOutput 2018-06-16T22:09:19.484*
>>> *Creating FileOutput 2018-06-16T22:09:19.506*
>>> *Creating aggreagator 2018-06-16T22:09:19.518*
>>> *Failing sequence generator!2018-06-16T22:09:19.542*
>>> *Creating FileOutput 2018-06-16T22:09:28.646*
>>> *Creating FileOutput 2018-06-16T22:09:28.668*
>>> *Creating aggreagator 2018-06-16T22:09:28.680*
>>> *Failing sequence generator!2018-06-16T22:09:28.704*
>>> *[1.0]*
>>> *Creating FileOutput 2018-06-16T22:09:37.864*
>>> *Creating FileOutput 2018-06-16T22:09:37.886*
>>> *Creating aggreagator 2018-06-16T22:09:37.897*
>>> *Failing sequence generator!2018-06-16T22:09:37.924*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>>> *Creating FileOutput 2018-06-16T22:09:46.921*
>>> *Creating FileOutput 2018-06-16T22:09:46.944*
>>> *Creating aggreagator 2018-06-16T22:09:46.956*
>>> *Failing sequence generator!2018-06-16T22:09:46.980*
>>> *[1.0, 2.0, 3.0, 4.0]*
>>> *[1.0, 2.0, 3.0, 4.0]*
>>> *[1.0, 2.0, 3.0, 4.0]*
>>> *[1.0, 2.0, 3.0, 4.0]*
>>> *Creating FileOutput 2018-06-16T22:09:56.049*
>>> *Creating FileOutput 2018-06-16T22:09:56.070*
>>> *Creating aggreagator 2018-06-16T22:09:56.081*
>>> *Failing sequence generator!2018-06-16T22:09:56.112*
>>> *[1.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>>> *Creating FileOutput 2018-06-16T22:10:05.213*
>>> *Creating FileOutput 2018-06-16T22:10:05.232*
>>> *Creating aggreagator 2018-06-16T22:10:05.241*
>>> *Failing sequence generator!2018-06-16T22:10:05.266**[1.0, 2.0]*
>>> *[1.0, 2.0]*
>>>
>>>
>>>
>>>  Could I ask for some explanation what I'm doing wrong?
>>>
>>> Regards,
>>> Matuesz Zakarczemny
>>>
>>>

Re: Operator checkpointing not working

Posted by Mateusz Zakarczemny <m....@gmail.com>.
Hi Pramod,
I removed transient but result is the same -
https://github.com/Matzz/apex-example/blob/master/src/main/java/aptest/Aggregator.java

Creating aggregator 2018-06-18T10:42:50.582
Failing aggregator! 2018-06-18T10:42:50.707
Creating FileOutput 2018-06-18T10:42:50.848
[1.0]
[1.0, 2.0]
[1.0, 2.0, 3.0]
[1.0, 2.0, 3.0, 4.0]
[1.0, 2.0, 3.0, 4.0, 5.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0]
Creating aggregator 2018-06-18T10:42:59.683
Failing aggregator! 2018-06-18T10:42:59.794
Creating FileOutput 2018-06-18T10:42:59.926
Creating aggregator 2018-06-18T10:43:08.810
Failing aggregator! 2018-06-18T10:43:08.918
Creating FileOutput 2018-06-18T10:43:08.988
[1.0]
[1.0, 2.0]
[1.0, 2.0, 3.0]
Creating FileOutput 2018-06-18T10:43:18.059
Creating aggregator 2018-06-18T10:43:18.142
Failing aggregator! 2018-06-18T10:43:18.227
[1.0]
[1.0, 2.0]
[1.0, 2.0, 3.0]
[1.0, 2.0, 3.0, 4.0]
Creating FileOutput 2018-06-18T10:43:27.130
Creating aggregator 2018-06-18T10:43:27.135
Failing aggregator! 2018-06-18T10:43:27.228
[1.0]
[1.0, 2.0]
[1.0, 2.0, 3.0]
[1.0, 2.0, 3.0, 4.0]
[1.0, 2.0, 3.0, 4.0, 5.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0]



pon., 18 cze 2018 o 00:16 Pramod Immaneni <pr...@gmail.com>
napisał(a):

> Hi Matuesz,
>
> It is because you have defined the list as transient in the Aggregator.
> Transient elements are not serialized and included when the checkpoint is
> created.
>
> Thanks
> On Sun, Jun 17, 2018 at 2:35 PM Mateusz Zakarczemny <
> m.zakarczemny@gmail.com> wrote:
>
>> Hi all,
>> I created simply app to test apex fault tolerance. It is build from three
>> main operators:
>> - sequence generator - operator which generate increasing numbers. One
>> per time window
>> - aggregator - just adds incoming number to the list and emits whole list
>> downstream
>> - file output - operator which writes incoming messages to the file
>> To make it faulty, aggregator operator throws an exception for 10% of
>> messages. Source code is here https://github.com/Matzz/apex-example
>>
>> I'm running it on sandbox docker image. I thought that even if
>> aggregation operator is faulty, application will checkpoint its state.
>> So over the time output list should be longer and longer.
>> Unfortunately, it looks like on each failure app is resenting it state to
>> the beginning. Sample output:
>>
>> *tail -f -n 100 /tmp/stream.out *
>>
>> *Creating FileOutput 2018-06-16T22:07:01.033*
>> *Creating aggreagator 2018-06-16T22:07:01.040*
>> *Creating FileOutput 2018-06-16T22:07:01.041*
>> *Creating FileOutput 2018-06-16T22:07:02.719*
>> *Creating aggreagator 2018-06-16T22:07:02.722*
>> *Creating FileOutput 2018-06-16T22:07:02.723*
>> *Creating FileOutput 2018-06-16T22:08:48.178*
>> *Creating aggreagator 2018-06-16T22:08:48.185*
>> *Creating FileOutput 2018-06-16T22:08:48.186*
>> *Creating FileOutput 2018-06-16T22:08:49.847*
>> *Creating aggreagator 2018-06-16T22:08:49.850*
>> *Creating FileOutput 2018-06-16T22:08:49.852*
>> *Creating FileOutput 2018-06-16T22:08:56.736*
>> *Creating aggreagator 2018-06-16T22:08:56.740*
>> *Creating FileOutput 2018-06-16T22:08:56.743*
>> *Creating FileOutput 2018-06-16T22:08:57.899*
>> *Creating aggreagator 2018-06-16T22:08:57.899*
>> *Creating FileOutput 2018-06-16T22:08:57.899*
>> *Creating FileOutput 2018-06-16T22:09:10.951*
>> *Creating FileOutput 2018-06-16T22:09:10.986*
>> *Creating aggreagator 2018-06-16T22:09:11.001*
>> *Failing sequence generator!2018-06-16T22:09:11.029*
>> *Creating FileOutput 2018-06-16T22:09:19.484*
>> *Creating FileOutput 2018-06-16T22:09:19.506*
>> *Creating aggreagator 2018-06-16T22:09:19.518*
>> *Failing sequence generator!2018-06-16T22:09:19.542*
>> *Creating FileOutput 2018-06-16T22:09:28.646*
>> *Creating FileOutput 2018-06-16T22:09:28.668*
>> *Creating aggreagator 2018-06-16T22:09:28.680*
>> *Failing sequence generator!2018-06-16T22:09:28.704*
>> *[1.0]*
>> *Creating FileOutput 2018-06-16T22:09:37.864*
>> *Creating FileOutput 2018-06-16T22:09:37.886*
>> *Creating aggreagator 2018-06-16T22:09:37.897*
>> *Failing sequence generator!2018-06-16T22:09:37.924*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
>> *Creating FileOutput 2018-06-16T22:09:46.921*
>> *Creating FileOutput 2018-06-16T22:09:46.944*
>> *Creating aggreagator 2018-06-16T22:09:46.956*
>> *Failing sequence generator!2018-06-16T22:09:46.980*
>> *[1.0, 2.0, 3.0, 4.0]*
>> *[1.0, 2.0, 3.0, 4.0]*
>> *[1.0, 2.0, 3.0, 4.0]*
>> *[1.0, 2.0, 3.0, 4.0]*
>> *Creating FileOutput 2018-06-16T22:09:56.049*
>> *Creating FileOutput 2018-06-16T22:09:56.070*
>> *Creating aggreagator 2018-06-16T22:09:56.081*
>> *Failing sequence generator!2018-06-16T22:09:56.112*
>> *[1.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
>> *Creating FileOutput 2018-06-16T22:10:05.213*
>> *Creating FileOutput 2018-06-16T22:10:05.232*
>> *Creating aggreagator 2018-06-16T22:10:05.241*
>> *Failing sequence generator!2018-06-16T22:10:05.266**[1.0, 2.0]*
>> *[1.0, 2.0]*
>>
>>
>>
>>  Could I ask for some explanation what I'm doing wrong?
>>
>> Regards,
>> Matuesz Zakarczemny
>>
>>

Re: Operator checkpointing not working

Posted by Pramod Immaneni <pr...@gmail.com>.
Hi Matuesz,

It is because you have defined the list as transient in the Aggregator.
Transient elements are not serialized and included when the checkpoint is
created.

Thanks
On Sun, Jun 17, 2018 at 2:35 PM Mateusz Zakarczemny <m....@gmail.com>
wrote:

> Hi all,
> I created simply app to test apex fault tolerance. It is build from three
> main operators:
> - sequence generator - operator which generate increasing numbers. One per
> time window
> - aggregator - just adds incoming number to the list and emits whole list
> downstream
> - file output - operator which writes incoming messages to the file
> To make it faulty, aggregator operator throws an exception for 10% of
> messages. Source code is here https://github.com/Matzz/apex-example
>
> I'm running it on sandbox docker image. I thought that even if aggregation
> operator is faulty, application will checkpoint its state. So over the
> time output list should be longer and longer.
> Unfortunately, it looks like on each failure app is resenting it state to
> the beginning. Sample output:
>
> *tail -f -n 100 /tmp/stream.out *
>
> *Creating FileOutput 2018-06-16T22:07:01.033*
> *Creating aggreagator 2018-06-16T22:07:01.040*
> *Creating FileOutput 2018-06-16T22:07:01.041*
> *Creating FileOutput 2018-06-16T22:07:02.719*
> *Creating aggreagator 2018-06-16T22:07:02.722*
> *Creating FileOutput 2018-06-16T22:07:02.723*
> *Creating FileOutput 2018-06-16T22:08:48.178*
> *Creating aggreagator 2018-06-16T22:08:48.185*
> *Creating FileOutput 2018-06-16T22:08:48.186*
> *Creating FileOutput 2018-06-16T22:08:49.847*
> *Creating aggreagator 2018-06-16T22:08:49.850*
> *Creating FileOutput 2018-06-16T22:08:49.852*
> *Creating FileOutput 2018-06-16T22:08:56.736*
> *Creating aggreagator 2018-06-16T22:08:56.740*
> *Creating FileOutput 2018-06-16T22:08:56.743*
> *Creating FileOutput 2018-06-16T22:08:57.899*
> *Creating aggreagator 2018-06-16T22:08:57.899*
> *Creating FileOutput 2018-06-16T22:08:57.899*
> *Creating FileOutput 2018-06-16T22:09:10.951*
> *Creating FileOutput 2018-06-16T22:09:10.986*
> *Creating aggreagator 2018-06-16T22:09:11.001*
> *Failing sequence generator!2018-06-16T22:09:11.029*
> *Creating FileOutput 2018-06-16T22:09:19.484*
> *Creating FileOutput 2018-06-16T22:09:19.506*
> *Creating aggreagator 2018-06-16T22:09:19.518*
> *Failing sequence generator!2018-06-16T22:09:19.542*
> *Creating FileOutput 2018-06-16T22:09:28.646*
> *Creating FileOutput 2018-06-16T22:09:28.668*
> *Creating aggreagator 2018-06-16T22:09:28.680*
> *Failing sequence generator!2018-06-16T22:09:28.704*
> *[1.0]*
> *Creating FileOutput 2018-06-16T22:09:37.864*
> *Creating FileOutput 2018-06-16T22:09:37.886*
> *Creating aggreagator 2018-06-16T22:09:37.897*
> *Failing sequence generator!2018-06-16T22:09:37.924*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]*
> *Creating FileOutput 2018-06-16T22:09:46.921*
> *Creating FileOutput 2018-06-16T22:09:46.944*
> *Creating aggreagator 2018-06-16T22:09:46.956*
> *Failing sequence generator!2018-06-16T22:09:46.980*
> *[1.0, 2.0, 3.0, 4.0]*
> *[1.0, 2.0, 3.0, 4.0]*
> *[1.0, 2.0, 3.0, 4.0]*
> *[1.0, 2.0, 3.0, 4.0]*
> *Creating FileOutput 2018-06-16T22:09:56.049*
> *Creating FileOutput 2018-06-16T22:09:56.070*
> *Creating aggreagator 2018-06-16T22:09:56.081*
> *Failing sequence generator!2018-06-16T22:09:56.112*
> *[1.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
> *[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]*
> *Creating FileOutput 2018-06-16T22:10:05.213*
> *Creating FileOutput 2018-06-16T22:10:05.232*
> *Creating aggreagator 2018-06-16T22:10:05.241*
> *Failing sequence generator!2018-06-16T22:10:05.266**[1.0, 2.0]*
> *[1.0, 2.0]*
>
>
>
>  Could I ask for some explanation what I'm doing wrong?
>
> Regards,
> Matuesz Zakarczemny
>
>